Whamcloud - gitweb
land b_hd_sec onto HEAD:
authorericm <ericm>
Fri, 22 Apr 2005 16:50:01 +0000 (16:50 +0000)
committerericm <ericm>
Fri, 22 Apr 2005 16:50:01 +0000 (16:50 +0000)
 * remote uid/gid handling
 * various gss fixes
 * fix broken local ACL

60 files changed:
lustre/cmobd/cm_obd.c
lustre/cobd/cache_obd.c
lustre/include/linux/lustre_cfg.h
lustre/include/linux/lustre_export.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_import.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_net.h
lustre/include/linux/lustre_ucache.h
lustre/include/linux/lvfs.h
lustre/include/linux/obd.h
lustre/include/linux/obd_class.h
lustre/ldlm/ldlm_lib.c
lustre/liblustre/llite_lib.c
lustre/liblustre/super.c
lustre/llite/dcache.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/lmv/lmv_obd.c
lustre/lov/lov_obd.c
lustre/lvfs/lvfs_linux.c
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/lproc_mds.c
lustre/mds/mds_fs.c
lustre/mds/mds_internal.h
lustre/mds/mds_lib.c
lustre/mds/mds_lmv.c
lustre/mds/mds_lov.c
lustre/mds/mds_lsd.c
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/obdclass/llog_test.c
lustre/obdecho/echo.c
lustre/obdecho/echo_client.c
lustre/obdfilter/filter.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlbd/client.c
lustre/ptlrpc/import.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/service.c
lustre/sec/gss/gss_api.h
lustre/sec/gss/gss_internal.h
lustre/sec/gss/sec_gss.c
lustre/sec/gss/svcsec_gss.c
lustre/sec/svcsec.c
lustre/sec/upcall_cache.c
lustre/tests/krb5_env.sh
lustre/tests/replay-single.sh
lustre/tests/test-framework.sh
lustre/utils/Makefile.am
lustre/utils/lconf
lustre/utils/llmount.c
lustre/utils/lsd_upcall.c

index f672def..5503d0d 100644 (file)
@@ -153,7 +153,7 @@ static int cmobd_setup(struct obd_device *obd, obd_count len, void *buf)
 
         /* connecting master */
         memset(&conn, 0, sizeof(conn));
-        rc = obd_connect(&conn, cmobd->master_obd, &obd->obd_uuid, 0);
+        rc = obd_connect(&conn, cmobd->master_obd, &obd->obd_uuid, NULL, 0);
         if (rc)
                 RETURN(rc);
         cmobd->master_exp = class_conn2export(&conn);
index 8a28304..bb3bbcf 100644 (file)
@@ -126,6 +126,7 @@ cobd_get_exp(struct obd_device *obd)
 
 static int client_obd_connect(struct obd_device *obd, char *name,
                               struct lustre_handle *conn,
+                              struct obd_connect_data *data,
                               unsigned long flags)
 { 
         struct obd_device *cli_obd;
@@ -142,7 +143,7 @@ static int client_obd_connect(struct obd_device *obd, char *name,
                        obd->obd_name, name);
                 RETURN(-EINVAL);
         }
-        rc = obd_connect(conn, cli_obd, &obd->obd_uuid, flags);
+        rc = obd_connect(conn, cli_obd, &obd->obd_uuid, data, flags);
         if (rc) {
                 CERROR("error connecting to %s, err %d\n",
                        name, rc);
@@ -172,7 +173,8 @@ static int client_obd_disconnect(struct obd_device *obd,
 
 static int
 cobd_connect(struct lustre_handle *conn, struct obd_device *obd,
-             struct obd_uuid *cluuid, unsigned long flags)
+             struct obd_uuid *cluuid, struct obd_connect_data *data,
+             unsigned long flags)
 {
         struct lustre_handle cache_conn = { 0 };
         struct cache_obd *cobd = &obd->u.cobd;
@@ -188,7 +190,7 @@ cobd_connect(struct lustre_handle *conn, struct obd_device *obd,
 
         /* connecting cache */
         rc = client_obd_connect(obd, cobd->cache_name,
-                                &cache_conn, flags);
+                                &cache_conn, data, flags);
         if (rc)
                 GOTO(err_discon, rc);
         cobd->cache_exp = class_conn2export(&cache_conn);
@@ -731,7 +733,8 @@ static int cobd_iocontrol(unsigned int cmd, struct obd_export *exp,
                         struct lustre_handle conn = {0};
 
                         rc = client_obd_disconnect(obd, cobd->master_exp, 0);
-                        rc = client_obd_connect(obd, cobd->cache_name, &conn, 0);
+                        rc = client_obd_connect(obd, cobd->cache_name, &conn,
+                                                NULL, 0);
                         if (rc)
                                 GOTO(out, rc);
                         cobd->cache_exp = class_conn2export(&conn);
@@ -750,7 +753,8 @@ static int cobd_iocontrol(unsigned int cmd, struct obd_export *exp,
                         cooksize = cache->u.cli.cl_max_mds_cookiesize;
                         
                         rc = client_obd_disconnect(obd, cobd->cache_exp, 0);
-                        rc = client_obd_connect(obd, cobd->master_name, &conn, 0);
+                        rc = client_obd_connect(obd, cobd->master_name, &conn,
+                                                NULL, 0);
                         if (rc)
                                 GOTO(out, rc);
                         cobd->master_exp = class_conn2export(&conn);
index e2b3cd3..75757b8 100644 (file)
@@ -271,6 +271,9 @@ static inline void lustre_cfg_freedata(char *buf, int len)
         return;
 }
 
+#define NOBODY_UID      99
+#define NOBODY_GID      99
+
 /* Passed by mount */
 struct lustre_mount_data {
         uint32_t lmd_magic;
@@ -281,6 +284,7 @@ struct lustre_mount_data {
         uint32_t lmd_server_ipaddr;
         uint32_t lmd_port;
         uint32_t lmd_async;
+        uint64_t lmd_remote_flag;
         uint32_t lmd_nllu;
         uint32_t lmd_nllg;
         char     lmd_security[16];
index 7404bd7..e4a0bdb 100644 (file)
@@ -22,11 +22,10 @@ struct mds_export_data {
         struct mds_client_data *med_mcd;
         loff_t                  med_off;
         int                     med_idx;
-        unsigned int            med_local:1;
+        unsigned int            med_initialized:1,
+                                med_remote:1;
         __u32                   med_nllu;
         __u32                   med_nllg;
-        /* simple idmapping */
-        spinlock_t              med_idmap_lock;
         struct mds_idmap_table *med_idmap;
 };
 
index 0ea4291..8a0926c 100644 (file)
@@ -224,6 +224,28 @@ static inline void lustre_msg_set_op_flags(struct lustre_msg *msg, int flags)
 #define MSG_CONNECT_INITIAL     0x20
 #define MSG_CONNECT_ASYNC       0x40
 
+/* Connect flags */
+
+#define OBD_CONNECT_RDONLY      (0x00000001LL)
+#define OBD_CONNECT_REMOTE      (0x40000000LL)
+#define OBD_CONNECT_LOCAL       (0x80000000LL)
+
+#define OBD_CONNECT_SUPPORTED (OBD_CONNECT_RDONLY |     \
+                               OBD_CONNECT_REMOTE |     \
+                               OBD_CONNECT_LOCAL)
+
+/* This structure is used for both request and reply.
+ *
+ * If we eventually have separate connect data for different types, which we
+ * almost certainly will, then perhaps we stick a union in here. */
+struct obd_connect_data {
+        __u64 ocd_connect_flags;
+        __u32 ocd_nllu[2];
+        __u64 padding[6];
+};
+
+extern void lustre_swab_connect(struct obd_connect_data *ocd);
+
 /*
  *   OST requests: OBDO & OBD request records
  */
index ad4faaa..0f7b8b3 100644 (file)
@@ -104,6 +104,7 @@ struct obd_import {
                                   imp_deactive:1;
         __u32                     imp_connect_op;
         __u32                     imp_connect_flags;
+        struct obd_connect_data   imp_connect_data;
 };
 
 typedef void (*obd_import_callback)(struct obd_import *imp, void *closure,
index 2d53a04..e089d26 100644 (file)
@@ -138,17 +138,31 @@ struct mds_client_data {
         __u8 mcd_padding[MDS_LR_CLIENT_SIZE - 88];
 };
 
-/* simple uid/gid mapping hash table */
-struct mds_idmap_item {
-        struct list_head        hash;
-        __u32                   id1;
-        __u32                   id2;
+/* uid/gid mapping support for remote client, some of them
+ * probably consume too much space??
+ */
+#define MDS_IDMAP_HASHSIZE      (32)
+#define MDS_IDMAP_HASHFUNC(id)  ((id) & (MDS_IDMAP_HASHSIZE - 1))
+
+#define MDS_RMT_UIDMAP_IDX      (0)
+#define MDS_LCL_UIDMAP_IDX      (1)
+#define MDS_RMT_GIDMAP_IDX      (2)
+#define MDS_LCL_GIDMAP_IDX      (3)
+#define MDS_IDMAP_N_HASHES      (4)
+
+#define MDS_IDMAP_NOTFOUND      (-1)
+
+struct mds_idmap_entry {
+        struct list_head rmt_hash; /* hashed as rmt_id; */
+        struct list_head lcl_hash; /* hashed as lcl_id; */
+        atomic_t         refcount;
+        uid_t            rmt_id;   /* remote uid/gid */
+        uid_t            lcl_id;   /* local uid/gid */
 };
 
-#define MDS_IDMAP_HASHSIZE      (32)
 struct mds_idmap_table {
-        struct list_head uidmap[MDS_IDMAP_HASHSIZE];
-        struct list_head gidmap[MDS_IDMAP_HASHSIZE];
+        spinlock_t       mit_lock;
+        struct list_head mit_idmaps[MDS_IDMAP_N_HASHES][MDS_IDMAP_HASHSIZE];
 };
 
 /* file data for open files on MDS */
@@ -187,14 +201,29 @@ struct mds_grp_hash {
         unsigned int            gh_allow_setgroups:1;
 };
 
+#ifdef PTL_NETID_ANY
+#error "remove this"
+#endif
+#define PTL_NETID_ANY   ((ptl_netid_t) -1)
+
+#define LSD_PERM_SETUID         0x00000001
+#define LSD_PERM_SETGID         0x00000002
+#define LSD_PERM_SETGRP         0x00000004
+
+struct lsd_permission {
+        ptl_nid_t       nid;
+        ptl_netid_t     netid;
+        __u32           perm;
+};
+
 /* lustre security descriptor */
 struct lustre_sec_desc {
+        unsigned int            lsd_invalid:1;
         uid_t                   lsd_uid;
         gid_t                   lsd_gid;
         struct group_info      *lsd_ginfo;
-        unsigned int            lsd_allow_setuid:1,
-                                lsd_allow_setgid:1,
-                                lsd_allow_setgrp:1;
+        __u32                   lsd_nperms;
+        struct lsd_permission  *lsd_perms;
 };
 
 struct lsd_cache_entry {
@@ -203,20 +232,23 @@ struct lsd_cache_entry {
 };
 
 struct lsd_downcall_args {
-        int     err;
-        uid_t   uid;
-        gid_t   gid;
-        __u32   ngroups;
-        gid_t  *groups;
-        __u32   allow_setuid;
-        __u32   allow_setgid;
-        __u32   allow_setgrp;
+        int                     err;
+        uid_t                   uid;
+        gid_t                   gid;
+        __u32                   ngroups;
+        gid_t                  *groups;
+        __u32                   nperms;
+        struct lsd_permission  *perms;       
 };
 
 /* mds/mds_reint.c  */
 int mds_reint_rec(struct mds_update_record *r, int offset,
                   struct ptlrpc_request *req, struct lustre_handle *);
 
+/* mds/mds_lsd.c */
+__u32 mds_lsd_get_perms(struct lustre_sec_desc *lsd, __u32 is_remote,
+                        ptl_netid_t netid, ptl_nid_t nid);
+
 /* mds/handler.c */
 #ifdef __KERNEL__
 struct dentry *
@@ -264,8 +296,6 @@ int mdc_enqueue(struct obd_export *exp,
                 void *cb_data);
 
 /* mdc/mdc_request.c */
-int mdc_get_secdesc_size(void);
-void mdc_pack_secdesc(struct ptlrpc_request *req, int size);
 int mdc_req2lustre_md(struct obd_export *exp_lmv, struct ptlrpc_request *req, 
                       unsigned int offset, struct obd_export *exp_lov, 
                       struct lustre_md *md);
index e3fd354..85ddfca 100644 (file)
@@ -317,8 +317,9 @@ struct ptlrpc_request {
         struct ptlrpc_svcsec *rq_svcsec;      /* server side security */
         /* XXX temporarily put here XXX */
         void                 *rq_sec_svcdata; /* server security data */
-        unsigned int          rq_remote;      /* from remote client */
+        unsigned int          rq_remote_realm;/* from remote realm */
         uid_t                 rq_auth_uid;
+        uid_t                 rq_mapped_uid;
 
         char *rq_reqbuf;       /* backend request buffer */
         int   rq_reqbuf_len;   /* backend request buffer length */
@@ -716,6 +717,8 @@ int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
 /* ptlrpc/pack_generic.c */
 int lustre_msg_swabbed(struct lustre_msg *msg);
 int lustre_msg_check_version(struct lustre_msg *msg, __u32 version);
+int lustre_secdesc_size(void);
+void lustre_pack_secdesc(struct ptlrpc_request *req, int size);
 int lustre_pack_request(struct ptlrpc_request *, int count, int *lens,
                         char **bufs);
 int lustre_pack_reply(struct ptlrpc_request *, int count, int *lens,
@@ -853,7 +856,9 @@ mdc_prepare_mdc_data(struct mdc_op_data *data, struct inode *i1,
 int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf);
 int client_obd_cleanup(struct obd_device * obddev, int flags);
 int client_connect_import(struct lustre_handle *conn, struct obd_device *obd,
-                          struct obd_uuid *cluuid, unsigned long);
+                          struct obd_uuid *cluuid,
+                          struct obd_connect_data *conn_data,
+                          unsigned long);
 int client_disconnect_export(struct obd_export *exp, unsigned long);
 
 int client_import_add_conn(struct obd_import *imp, struct obd_uuid *uuid,
index 68e37db..ac89de5 100644 (file)
@@ -49,8 +49,9 @@ struct upcall_cache {
 
         char                   *uc_name;
         char                    uc_upcall[UC_CACHE_UPCALL_MAXPATH];
-        unsigned long           uc_acquire_expire;
-        unsigned long           uc_entry_expire;
+        unsigned long           uc_acquire_expire;   /* max acquire time */
+        unsigned long           uc_entry_expire;     /* max entry life time */
+        unsigned long           uc_err_entry_expire; /* err entry life time */
 
         /* functions */
         unsigned int                (*hash)(struct upcall_cache *, __u64);
@@ -70,8 +71,7 @@ void upcall_cache_init_entry(struct upcall_cache *cache,
 struct upcall_cache_entry *
 upcall_cache_get_entry(struct upcall_cache *cache, __u64 key);
 void upcall_cache_put_entry(struct upcall_cache_entry *entry);
-int upcall_cache_downcall(struct upcall_cache *cache, __u64 key,
-                          int err, void *args);
+int upcall_cache_downcall(struct upcall_cache *cache, __u64 key, void *args);
 void upcall_cache_flush_one(struct upcall_cache *cache, __u64 key);
 void upcall_cache_flush_idle(struct upcall_cache *cache);
 void upcall_cache_flush_all(struct upcall_cache *cache);
index 008fcb7..bdf94ff 100644 (file)
@@ -47,11 +47,12 @@ struct mds_grp_hash_entry;
 struct lvfs_ucred {
         struct lustre_sec_desc *luc_lsd;
         struct group_info      *luc_ginfo;
-        __u32 luc_fsuid;
-        __u32 luc_fsgid;
-        __u32 luc_cap;
-        __u32 luc_uid;
-        __u32 luc_umask;
+        __u32                   luc_uid;
+        __u32                   luc_gid;
+        __u32                   luc_fsuid;
+        __u32                   luc_fsgid;
+        __u32                   luc_cap;
+        __u32                   luc_umask;
 };
 
 struct lvfs_callback_ops {
index 9727313..684f0f7 100644 (file)
@@ -278,8 +278,6 @@ struct client_obd {
         /* security flavors */
         __u32                    cl_sec_flavor;
         __u32                    cl_sec_subflavor;
-        __u32                    cl_nllu; /* non lustre local user */
-        __u32                    cl_nllg; /* non lustre local group */
 
         //struct llog_canceld_ctxt *cl_llcd; /* it's included by obd_llog_ctxt */
         void                    *cl_llcd_offset;
@@ -523,6 +521,7 @@ struct lmv_obd {
         int                     server_timeout;
         int                     connect_flags;
         struct semaphore        init_sem;
+        struct obd_connect_data conn_data;
 };
 
 struct niobuf_local {
@@ -716,7 +715,8 @@ struct obd_ops {
                           int priority);
         int (*o_del_conn)(struct obd_import *imp, struct obd_uuid *uuid);
         int (*o_connect)(struct lustre_handle *conn, struct obd_device *src,
-                         struct obd_uuid *cluuid, unsigned long flags);
+                         struct obd_uuid *cluuid, struct obd_connect_data *data,
+                         unsigned long flags);
         int (*o_connect_post)(struct obd_export *exp, unsigned long flags);
         int (*o_disconnect)(struct obd_export *exp, unsigned long flags);
 
index 66cecd7..2c1e13c 100644 (file)
@@ -594,6 +594,7 @@ static inline int obd_del_conn(struct obd_import *imp,
 static inline int obd_connect(struct lustre_handle *conn,
                               struct obd_device *obd,
                               struct obd_uuid *cluuid,
+                              struct obd_connect_data *data,
                               unsigned long flags)
 {
         int rc;
@@ -603,7 +604,7 @@ static inline int obd_connect(struct lustre_handle *conn,
         OBD_CHECK_OP(obd, connect, -EOPNOTSUPP);
         OBD_COUNTER_INCREMENT(obd, connect);
 
-        rc = OBP(obd, connect)(conn, obd, cluuid, flags);
+        rc = OBP(obd, connect)(conn, obd, cluuid, data, flags);
         RETURN(rc);
 }
 
index d87e551..ed82823 100644 (file)
@@ -374,6 +374,7 @@ int client_obd_cleanup(struct obd_device *obddev, int flags)
 int client_connect_import(struct lustre_handle *dlm_handle,
                           struct obd_device *obd,
                           struct obd_uuid *cluuid,
+                          struct obd_connect_data *conn_data,
                           unsigned long connect_flags)
 {
         struct client_obd *cli = &obd->u.cli;
@@ -409,6 +410,9 @@ int client_connect_import(struct lustre_handle *dlm_handle,
                 GOTO(out_ldlm, rc);
 
         imp->imp_connect_flags = connect_flags;
+        if (conn_data)
+                memcpy(&imp->imp_connect_data, conn_data, sizeof(*conn_data));
+
         rc = ptlrpc_connect_import(imp, NULL);
         if (rc != 0) {
                 LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
@@ -541,17 +545,21 @@ int target_handle_connect(struct ptlrpc_request *req)
         struct obd_uuid cluuid;
         struct obd_uuid remote_uuid;
         struct list_head *p;
+        struct obd_connect_data *conn_data;
+        int conn_data_size = sizeof(*conn_data);
         char *str, *tmp;
         int rc = 0;
         unsigned long flags;
         int initial_conn = 0;
         char peer_str[PTL_NALFMT_SIZE];
+        const int offset = 1;
         ENTRY;
 
         OBD_RACE(OBD_FAIL_TGT_CONN_RACE); 
 
-        LASSERT_REQSWAB (req, 0);
-        str = lustre_msg_string(req->rq_reqmsg, 0, sizeof(tgtuuid) - 1);
+        LASSERT_REQSWAB (req, offset + 0);
+        str = lustre_msg_string(req->rq_reqmsg, offset + 0,
+                                sizeof(tgtuuid) - 1);
         if (str == NULL) {
                 CERROR("bad target UUID for connect\n");
                 GOTO(out, rc = -EINVAL);
@@ -568,8 +576,8 @@ int target_handle_connect(struct ptlrpc_request *req)
                 GOTO(out, rc = -ENODEV);
         }
 
-        LASSERT_REQSWAB (req, 1);
-        str = lustre_msg_string(req->rq_reqmsg, 1, sizeof(cluuid) - 1);
+        LASSERT_REQSWAB (req, offset + 1);
+        str = lustre_msg_string(req->rq_reqmsg, offset + 1, sizeof(cluuid) - 1);
         if (str == NULL) {
                 CERROR("bad client UUID for connect\n");
                 GOTO(out, rc = -EINVAL);
@@ -592,17 +600,22 @@ int target_handle_connect(struct ptlrpc_request *req)
                 LBUG();
         }
 
-        tmp = lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn);
+        tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2, sizeof conn);
         if (tmp == NULL)
                 GOTO(out, rc = -EPROTO);
 
         memcpy(&conn, tmp, sizeof conn);
 
-        cfp = lustre_msg_buf(req->rq_reqmsg, 3, sizeof(unsigned long));
+        cfp = lustre_msg_buf(req->rq_reqmsg, offset + 3, sizeof(unsigned long));
         LASSERT(cfp != NULL);
         connect_flags = *cfp;
 
-        rc = lustre_pack_reply(req, 0, NULL, NULL);
+        conn_data = lustre_swab_reqbuf(req, offset + 4, sizeof(*conn_data),
+                                       lustre_swab_connect);
+        if (!conn_data)
+                GOTO(out, rc = -EPROTO);
+
+        rc = lustre_pack_reply(req, 1, &conn_data_size, NULL);
         if (rc)
                 GOTO(out, rc);
         
@@ -677,9 +690,17 @@ int target_handle_connect(struct ptlrpc_request *req)
                         rc = -EBUSY;
                 } else {
  dont_check_exports:
-                        rc = obd_connect(&conn, target, &cluuid, connect_flags);
+                        rc = obd_connect(&conn, target, &cluuid, conn_data,
+                                         connect_flags);
                 }
         }
+
+        /* Return only the parts of obd_connect_data that we understand, so the
+         * client knows that we don't understand the rest. */
+        conn_data->ocd_connect_flags &= OBD_CONNECT_SUPPORTED;
+        memcpy(lustre_msg_buf(req->rq_repmsg, 0, sizeof(*conn_data)), conn_data,
+               sizeof(*conn_data));
+
         /* Tell the client if we support replayable requests */
         if (target->obd_replayable)
                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
@@ -747,7 +768,7 @@ int target_handle_connect(struct ptlrpc_request *req)
         if (target->obd_recovering)
                 target->obd_connected_clients++;
 
-        memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, 2, sizeof(conn)),
+        memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, offset + 2, sizeof(conn)),
                sizeof(conn));
 
         if (export->exp_imp_reverse != NULL) {
index 395eb3f..44eb936 100644 (file)
@@ -153,7 +153,7 @@ int liblustre_process_log(struct config_llog_instance *cfg, int allow_recov)
                            strlen("initial_recov"), "initial_recov",
                            sizeof(allow_recov), &allow_recov);
 
-        err = obd_connect(&mdc_conn, obd, &mdc_uuid, 0);
+        err = obd_connect(&mdc_conn, obd, &mdc_uuid, NULL, 0);
         if (err) {
                 CERROR("cannot connect to %s: rc = %d\n",
                         g_zconf_mdsname, err);
index 26a9720..2bd60ac 100644 (file)
@@ -1488,7 +1488,7 @@ llu_fsswop_mount(const char *source,
                 GOTO(out_free, err = -EINVAL);
 #endif
         /* setup mdc */
-        err = obd_connect(&lmv_conn, obd, &sbi->ll_sb_uuid, 0);
+        err = obd_connect(&lmv_conn, obd, &sbi->ll_sb_uuid, NULL, 0);
         if (err) {
                 CERROR("cannot connect to %s: rc = %d\n", lmv, err);
                 GOTO(out_free, err);
@@ -1512,7 +1512,7 @@ llu_fsswop_mount(const char *source,
         obd_set_info(obd->obd_self_export, strlen("async"), "async",
                      sizeof(async), &async);
 
-        err = obd_connect(&lov_conn, obd, &sbi->ll_sb_uuid, 0);
+        err = obd_connect(&lov_conn, obd, &sbi->ll_sb_uuid, NULL, 0);
         if (err) {
                 CERROR("cannot connect to %s: rc = %d\n", lov, err);
                 GOTO(out_lmv, err);
index 90a41d3..096d91e 100644 (file)
@@ -482,18 +482,19 @@ revalidate_finish:
 
         GOTO(out, rc);
 out:
-        if (req != NULL && rc == 1) {
+        /* If we had succesful it lookup on mds, but it happened to be negative,
+           we do not free request as it will be reused during lookup (see
+           comment in mdc/mdc_locks.c::mdc_intent_lock(). But if
+           request was not completed, we need to free it. (bug 5154) */
+        if (req != NULL && (rc == 1 || !it_disposition(it, DISP_ENQ_COMPLETE))) {
                 ptlrpc_req_finished(req);
                 req = NULL;
         }
 
         if (rc == 0) {
-                if (it == &lookup_it) {
+                if (it == &lookup_it)
                         ll_intent_release(it);
-                        if (req) /* special case: We did lookup and it failed,
-                                    need to free request */
-                                ptlrpc_req_finished(req);
-                }
+
                 ll_unhash_aliases(de->d_inode);
                 return 0;
         }
index 3c4cadb..4df7fd7 100644 (file)
@@ -1955,94 +1955,65 @@ int ll_listxattr(struct dentry *dentry, char *list, size_t size)
                                     OBD_MD_FLEALIST);
 }
 
-int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
+/*
+ * XXX We could choose not to check DLM lock. Leave the decision
+ * to remote acl handling.
+ */
+static int
+lustre_check_acl(struct inode *inode, int mask)
 {
         struct lookup_intent it = { .it_op = IT_GETATTR };
-        int mode = inode->i_mode;
-        struct dentry de;
+        struct dentry de = { .d_inode = inode };
         struct ll_sb_info *sbi;
         struct lustre_id id;
         struct ptlrpc_request *req = NULL;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct posix_acl *acl;
         int rc;
         ENTRY;
 
         sbi = ll_i2sbi(inode);
         ll_inode2id(&id, inode);
 
-        /* Nobody gets write access to a read-only fs */
-        if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
-            (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
-                return -EROFS;
-        /* Nobody gets write access to an immutable file */
-        if ((mask & MAY_WRITE) && IS_IMMUTABLE(inode))
+        if (ll_intent_alloc(&it))
                 return -EACCES;
-        if (current->fsuid == inode->i_uid) {
-                mode >>= 6;
-        } else if (1) {
-                struct ll_inode_info *lli = ll_i2info(inode);
-                struct posix_acl *acl;
-
-                /* The access ACL cannot grant access if the group class
-                   permission bits don't contain all requested permissions. */
-                if (((mode >> 3) & mask & S_IRWXO) != mask)
-                        goto check_groups;
 
-                if (ll_intent_alloc(&it))
-                        return -EACCES;
-
-                de.d_inode = inode;
-                rc = md_intent_lock(sbi->ll_md_exp, &id, NULL, 0, NULL, 0, &id,
-                                    &it, 0, &req, ll_mdc_blocking_ast);
-                if (rc < 0) {
-                        ll_intent_free(&it);
-                        GOTO(out, rc);
-                }
+        rc = md_intent_lock(sbi->ll_md_exp, &id, NULL, 0, NULL, 0, &id,
+                            &it, 0, &req, ll_mdc_blocking_ast);
+        if (rc < 0) {
+                ll_intent_free(&it);
+                GOTO(out, rc);
+        }
 
-                rc = revalidate_it_finish(req, 1, &it, &de);
-                if (rc) {
-                        ll_intent_release(&it);
-                        GOTO(out, rc);
-                }
+        rc = revalidate_it_finish(req, 1, &it, &de);
+        if (rc) {
+                ll_intent_release(&it);
+                GOTO(out, rc);
+        }
 
-                ll_lookup_finish_locks(&it, &de);
-                ll_intent_free(&it);
+        ll_lookup_finish_locks(&it, &de);
+        ll_intent_free(&it);
 
-                spin_lock(&lli->lli_lock);
-                acl = posix_acl_dup(ll_i2info(inode)->lli_acl_access);
-                spin_unlock(&lli->lli_lock);
+        spin_lock(&lli->lli_lock);
+        acl = posix_acl_dup(ll_i2info(inode)->lli_acl_access);
+        spin_unlock(&lli->lli_lock);
 
-                if (!acl)
-                        goto check_groups;
+        if (!acl)
+                GOTO(out, rc = -EAGAIN);
 
-                rc = posix_acl_permission(inode, acl, mask);
-                posix_acl_release(acl);
-                if (rc == -EACCES)
-                        goto check_capabilities;
-                GOTO(out, rc);
-        } else {
-check_groups:
-                if (in_group_p(inode->i_gid))
-                        mode >>= 3;
-        }
-        if ((mode & mask & S_IRWXO) == mask)
-                GOTO(out, rc = 0);
+        rc = posix_acl_permission(inode, acl, mask);
+        posix_acl_release(acl);
 
-check_capabilities:
-        rc = -EACCES; 
-        /* Allowed to override Discretionary Access Control? */
-        if (!(mask & MAY_EXEC) ||
-            (inode->i_mode & S_IXUGO) || S_ISDIR(inode->i_mode))
-                if (capable(CAP_DAC_OVERRIDE))
-                        GOTO(out, rc = 0);
-       /* Read and search granted if capable(CAP_DAC_READ_SEARCH) */
-        if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
-            (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
-                GOTO(out, rc = 0);
 out:
         if (req)
                 ptlrpc_req_finished(req);
 
-        return rc;
+        RETURN(rc);
+}
+
+int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
+{
+        return generic_permission(inode, mask, lustre_check_acl);
 }
 
 struct file_operations ll_file_operations = {
index 367348d..971fb04 100644 (file)
@@ -72,7 +72,9 @@ struct ll_sb_info {
         struct list_head          ll_pglist;
 
         struct ll_ra_info         ll_ra_info;
-                                                                                                                                                                                                     
+
+        unsigned int              ll_remote;    /* remote client? */
+
         /* times spent waiting for locks in each call site.  These are
          * all protected by the ll_lock */
         struct obd_service_time   ll_read_stime;
index 03c8d4d..c4b54a2 100644 (file)
@@ -126,18 +126,21 @@ int lustre_init_dt_desc(struct ll_sb_info *sbi)
 extern struct dentry_operations ll_d_ops;
 
 int lustre_common_fill_super(struct super_block *sb, char *lmv, char *lov,
-                             char *security, __u32 *nllu, int async)
+                             int async, char *security, __u32 *nllu,
+                             __u64 *remote)
 {
         struct ll_sb_info *sbi = ll_s2sbi(sb);
         struct ptlrpc_request *request = NULL;
         struct lustre_handle dt_conn = {0, };
         struct lustre_handle md_conn = {0, };
+        struct obd_connect_data *data;
         struct inode *root = NULL;
         struct obd_device *obd;
         struct obd_statfs osfs;
         struct lustre_md md;
         kdev_t devno;
         int err;
+        __u32 valsize;
         ENTRY;
 
         obd = class_name2obd(lmv);
@@ -148,6 +151,20 @@ int lustre_common_fill_super(struct super_block *sb, char *lmv, char *lov,
         obd_set_info(obd->obd_self_export, strlen("async"), "async",
                      sizeof(async), &async);
 
+        if ((*remote & (OBD_CONNECT_LOCAL | OBD_CONNECT_REMOTE)) ==
+            (OBD_CONNECT_LOCAL | OBD_CONNECT_REMOTE)) {
+                CERROR("wrong remote flag "LPX64"\n", *remote);
+                RETURN(-EINVAL);
+        }
+
+        OBD_ALLOC(data, sizeof(*data));
+        if (!data)
+                RETURN(-ENOMEM);
+
+        data->ocd_connect_flags |= *remote & (OBD_CONNECT_LOCAL |
+                                              OBD_CONNECT_REMOTE);
+        memcpy(data->ocd_nllu, nllu, sizeof(data->ocd_nllu));
+
         if (security == NULL)
                 security = "null";
 
@@ -156,14 +173,7 @@ int lustre_common_fill_super(struct super_block *sb, char *lmv, char *lov,
         if (err) {
                 CERROR("LMV %s: failed to set security %s, err %d\n",
                         lmv, security, err);
-                RETURN(err);
-        }
-
-        err = obd_set_info(obd->obd_self_export, strlen("nllu"), "nllu",
-                           sizeof(__u32) * 2, nllu);
-        if (err) {
-                CERROR("LMV %s: failed to set NLLU, err %d\n",
-                        lmv, err);
+                OBD_FREE(data, sizeof(*data));
                 RETURN(err);
         }
 
@@ -174,7 +184,8 @@ int lustre_common_fill_super(struct super_block *sb, char *lmv, char *lov,
                         CERROR("could not register mount in /proc/lustre");
         }
 
-        err = obd_connect(&md_conn, obd, &sbi->ll_sb_uuid, OBD_OPT_REAL_CLIENT);
+        err = obd_connect(&md_conn, obd, &sbi->ll_sb_uuid, data,
+                          OBD_OPT_REAL_CLIENT);
         if (err == -EBUSY) {
                 CERROR("An MDS (lmv %s) is performing recovery, of which this"
                        " client is not a part.  Please wait for recovery to "
@@ -204,6 +215,17 @@ int lustre_common_fill_super(struct super_block *sb, char *lmv, char *lov,
 
         sb->s_dev = devno;
 
+        /* after statfs, we are supposed to have connected to MDSs,
+         * so it's ok to check remote flag returned.
+         */
+        valsize = sizeof(&sbi->ll_remote);
+        err = obd_get_info(sbi->ll_md_exp, strlen("remote_flag"), "remote_flag",
+                           &valsize, &sbi->ll_remote);
+        if (err) {
+                CERROR("fail to obtain remote flag\n");
+                GOTO(out, err);
+        }
+
         obd = class_name2obd(lov);
         if (!obd) {
                 CERROR("OSC %s: not setup or attached\n", lov);
@@ -212,7 +234,8 @@ int lustre_common_fill_super(struct super_block *sb, char *lmv, char *lov,
         obd_set_info(obd->obd_self_export, strlen("async"), "async",
                      sizeof(async), &async);
 
-        err = obd_connect(&dt_conn, obd, &sbi->ll_sb_uuid, OBD_OPT_REAL_CLIENT);
+        err = obd_connect(&dt_conn, obd, &sbi->ll_sb_uuid, data,
+                          OBD_OPT_REAL_CLIENT);
         if (err == -EBUSY) {
                 CERROR("An OST (lov %s) is performing recovery, of which this"
                        " client is not a part.  Please wait for recovery to "
@@ -289,11 +312,14 @@ int lustre_common_fill_super(struct super_block *sb, char *lmv, char *lov,
         sb->s_root = d_alloc_root(root);
         sb->s_root->d_op = &ll_d_ops;
 
+        sb->s_flags |= MS_POSIXACL;
 #ifdef S_PDIROPS
         CWARN("Enabling PDIROPS\n");
         sb->s_flags |= S_PDIROPS;
 #endif
 
+        if (data != NULL)
+                OBD_FREE(data, sizeof(*data));
         RETURN(err);
 out_root:
         if (root)
@@ -303,6 +329,8 @@ out_lov:
 out_lmv:
         obd_disconnect(sbi->ll_md_exp, 0);
 out:
+        if (data != NULL)
+                OBD_FREE(data, sizeof(*data));
         lprocfs_unregister_mountpoint(sbi);
         return err;
 }
@@ -440,7 +468,8 @@ int ll_fill_super(struct super_block *sb, void *data, int silent)
         char *lmv = NULL;
         int async, err;
         char *sec = NULL;
-        __u32 nllu[2] = { 99, 99 };
+        __u32 nllu[2] = { NOBODY_UID, NOBODY_GID };
+        __u64 remote_flag = 0;    
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb);
@@ -461,8 +490,9 @@ int ll_fill_super(struct super_block *sb, void *data, int silent)
                 CERROR("no mdc\n");
                 GOTO(out, err = -EINVAL);
         }
-
-        err = lustre_common_fill_super(sb, lmv, lov, sec, nllu, async);
+        
+        err = lustre_common_fill_super(sb, lmv, lov, async, sec, nllu,
+                                       &remote_flag);
         EXIT;
 out:
         if (err)
@@ -566,7 +596,7 @@ static int lustre_process_log(struct lustre_mount_data *lmd, char *profile,
         if (rc)
                 GOTO(out_cleanup, rc);
 
-        rc = obd_connect(&md_conn, obd, &lmv_uuid, 0);
+        rc = obd_connect(&md_conn, obd, &lmv_uuid, NULL, 0);
         if (rc) {
                 CERROR("cannot connect to %s: rc = %d\n", lmd->lmd_mds, rc);
                 GOTO(out_cleanup, rc);
@@ -733,8 +763,9 @@ int lustre_fill_super(struct super_block *sb, void *data, int silent)
                 GOTO(out_free, err = -EINVAL);
         }
 
-        err = lustre_common_fill_super(sb, lmv, lov, lmd->lmd_security,
-                                       &lmd->lmd_nllu, lmd->lmd_async);
+        err = lustre_common_fill_super(sb, lmv, lov, lmd->lmd_async,
+                                       lmd->lmd_security, &lmd->lmd_nllu,
+                                       &lmd->lmd_remote_flag);
 
         if (err)
                 GOTO(out_free, err);
index 3e4bcde..0a39316 100644 (file)
@@ -193,7 +193,8 @@ int lmv_detach(struct obd_device *dev)
 /* this is fake connect function. Its purpose is to initialize lmv and say
  * caller that everything is okay. Real connection will be performed later. */
 static int lmv_connect(struct lustre_handle *conn, struct obd_device *obd,
-                       struct obd_uuid *cluuid, unsigned long flags)
+                       struct obd_uuid *cluuid, struct obd_connect_data *data,
+                       unsigned long flags)
 {
 #ifdef __KERNEL__
         struct proc_dir_entry *lmv_proc_dir;
@@ -224,6 +225,8 @@ static int lmv_connect(struct lustre_handle *conn, struct obd_device *obd,
         lmv->cluuid = *cluuid;
         lmv->connect_flags = flags;
         sema_init(&lmv->init_sem, 1);
+        if (data)
+                memcpy(&lmv->conn_data, data, sizeof(*data));
 
 #ifdef __KERNEL__
         lmv_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
@@ -338,7 +341,7 @@ int lmv_check_connect(struct obd_device *obd)
                         GOTO(out_disc, rc = -EINVAL);
                 }
                 
-                rc = obd_connect(&conn, tgt_obd, &lmv_mdc_uuid,
+                rc = obd_connect(&conn, tgt_obd, &lmv_mdc_uuid, &lmv->conn_data,
                                  lmv->connect_flags);
                 if (rc) {
                         CERROR("target %s connect error %d\n",
@@ -1809,6 +1812,26 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen,
                 struct lmv_desc *desc_ret = val;
                 *desc_ret = lmv->desc;
                 RETURN(0);
+        } else if (keylen == strlen("remote_flag") &&
+                   !strcmp(key, "remote_flag")) {
+                struct lmv_tgt_desc *tgts;
+                int i;
+
+                LASSERT(*vallen == sizeof(__u32));
+                for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
+                     i++, tgts++) {
+
+                        /* all tgts should be connected when this get called. */
+                        if (!tgts || !tgts->ltd_exp) {
+                                CERROR("target not setup?\n");
+                                continue;
+                        }
+
+                        if (!obd_get_info(tgts->ltd_exp, keylen, key,
+                                          vallen, val))
+                                RETURN(0);
+                }
+                RETURN(-EINVAL);
         }
 
         CDEBUG(D_IOCTL, "invalid key\n");
index 218a518..b2ae65e 100644 (file)
@@ -54,7 +54,8 @@
 /* obd methods */
 #define MAX_STRING_SIZE 128
 static int lov_connect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt,
-                           int activate, unsigned long connect_flags)
+                           int activate, struct obd_connect_data *conn_data,
+                           unsigned long connect_flags)
 {
         struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
         struct obd_uuid *tgt_uuid = &tgt->uuid;
@@ -98,7 +99,8 @@ static int lov_connect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt,
                 RETURN(0);
         }
 
-        rc = obd_connect(&conn, tgt_obd, &lov_osc_uuid, connect_flags);
+        rc = obd_connect(&conn, tgt_obd, &lov_osc_uuid, conn_data,
+                         connect_flags);
         if (rc) {
                 CERROR("Target %s connect error %d\n", tgt_uuid->uuid, rc);
                 RETURN(rc);
@@ -148,7 +150,8 @@ static int lov_connect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt,
 }
 
 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
-                       struct obd_uuid *cluuid, unsigned long flags)
+                       struct obd_uuid *cluuid, struct obd_connect_data *data,
+                       unsigned long flags)
 {
 #ifdef __KERNEL__
         struct proc_dir_entry *lov_proc_dir;
@@ -188,7 +191,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
                 if (obd_uuid_empty(&tgt->uuid))
                         continue;
-                rc = lov_connect_obd(obd, tgt, 0, flags);
+                rc = lov_connect_obd(obd, tgt, 0, data, flags);
                 if (rc)
                         GOTO(out_disc, rc);
         }
@@ -536,7 +539,7 @@ lov_add_obd(struct obd_device *obd, struct obd_uuid *uuidp, int index, int gen)
                         osc_obd->obd_no_recov = 0;
         }
 
-        rc = lov_connect_obd(obd, tgt, 1, lov->lov_connect_flags);
+        rc = lov_connect_obd(obd, tgt, 1, NULL, lov->lov_connect_flags);
         if (rc)
                 GOTO(out, rc);
 
index 5c11665..b981fe9 100644 (file)
@@ -148,10 +148,14 @@ void push_ctxt(struct lvfs_run_ctxt *save, struct lvfs_run_ctxt *new_ctx,
         LASSERT(new_ctx->pwdmnt);
 
         if (uc) {
+                save->luc.luc_uid = current->uid;
+                save->luc.luc_gid = current->gid;
                 save->luc.luc_fsuid = current->fsuid;
                 save->luc.luc_fsgid = current->fsgid;
                 save->luc.luc_cap = current->cap_effective;
 
+                current->uid = uc->luc_uid;
+                current->gid = uc->luc_gid;
                 current->fsuid = uc->luc_fsuid;
                 current->fsgid = uc->luc_fsgid;
                 current->cap_effective = uc->luc_cap;
@@ -207,6 +211,8 @@ void pop_ctxt(struct lvfs_run_ctxt *saved, struct lvfs_run_ctxt *new_ctx,
         mntput(saved->pwdmnt);
         current->fs->umask = saved->luc.luc_umask;
         if (uc) {
+                current->uid = saved->luc.luc_uid;
+                current->gid = saved->luc.luc_gid;
                 current->fsuid = saved->luc.luc_fsuid;
                 current->fsgid = saved->luc.luc_fsgid;
                 current->cap_effective = saved->luc.luc_cap;
index 436a155..f84882c 100644 (file)
@@ -229,7 +229,7 @@ int mdc_enqueue(struct obd_export *exp,
 //        LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
 //                          ldlm_it2str(it->it_op), it_name, it_inode->i_ino);
 
-        reqsize[0] = mdc_get_secdesc_size();
+        reqsize[0] = lustre_secdesc_size();
 
         if (it->it_op & IT_OPEN) {
                 it->it_create_mode |= S_IFREG;
@@ -326,7 +326,7 @@ int mdc_enqueue(struct obd_export *exp,
                 RETURN(-EINVAL);
         }
 
-        mdc_pack_secdesc(req, reqsize[0]);
+        lustre_pack_secdesc(req, reqsize[0]);
 
         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
         rc = ldlm_cli_enqueue(exp, req, obddev->obd_namespace, res_id,
index 3b5d382..6cd50d2 100644 (file)
@@ -81,7 +81,7 @@ int mdc_setattr(struct obd_export *exp, struct mdc_op_data *data,
 
         LASSERT(iattr != NULL);
 
-        size[0] = mdc_get_secdesc_size();
+        size[0] = lustre_secdesc_size();
         if (ealen > 0) {
                 bufcount++;
                 if (ea2len > 0)
@@ -93,7 +93,7 @@ int mdc_setattr(struct obd_export *exp, struct mdc_op_data *data,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        mdc_pack_secdesc(req, size[0]);
+        lustre_pack_secdesc(req, size[0]);
 
         if (iattr->ia_valid & ATTR_FROM_OPEN) {
                 req->rq_request_portal = MDS_SETATTR_PORTAL; //XXX FIXME bug 249
@@ -130,7 +130,7 @@ int mdc_create(struct obd_export *exp, struct mdc_op_data *op_data,
         int level, bufcount = 3;
         ENTRY;
 
-        size[0] = mdc_get_secdesc_size();
+        size[0] = lustre_secdesc_size();
         if (data && datalen) {
                 size[bufcount] = datalen;
                 bufcount++;
@@ -141,7 +141,7 @@ int mdc_create(struct obd_export *exp, struct mdc_op_data *op_data,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        mdc_pack_secdesc(req, size[0]);
+        lustre_pack_secdesc(req, size[0]);
 
         /*
          * mdc_create_pack() fills msg->bufs[1] with name and msg->bufs[2] with
@@ -180,14 +180,14 @@ int mdc_unlink(struct obd_export *exp, struct mdc_op_data *data,
         ENTRY;
         LASSERT(req == NULL);
 
-        size[0] = mdc_get_secdesc_size();
+        size[0] = lustre_secdesc_size();
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
                               MDS_REINT, 4, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        mdc_pack_secdesc(req, size[0]);
+        lustre_pack_secdesc(req, size[0]);
         *request = req;
 
         size[0] = sizeof(struct mds_body);
@@ -212,14 +212,14 @@ int mdc_link(struct obd_export *exp, struct mdc_op_data *data,
         int rc, size[3] = {0, sizeof(struct mds_rec_link), data->namelen + 1};
         ENTRY;
 
-        size[0] = mdc_get_secdesc_size();
+        size[0] = lustre_secdesc_size();
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
                               MDS_REINT, 3, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        mdc_pack_secdesc(req, size[0]);
+        lustre_pack_secdesc(req, size[0]);
 
         mdc_link_pack(req->rq_reqmsg, 1, data);
 
@@ -244,14 +244,14 @@ int mdc_rename(struct obd_export *exp, struct mdc_op_data *data,
                            newlen + 1, obd->u.cli.cl_max_mds_cookiesize};
         ENTRY;
 
-        size[0] = mdc_get_secdesc_size();
+        size[0] = lustre_secdesc_size();
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
                               MDS_REINT, 5, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        mdc_pack_secdesc(req, size[0]);
+        lustre_pack_secdesc(req, size[0]);
 
         mdc_rename_pack(req->rq_reqmsg, 1, data, old, oldlen, new, newlen);
 
index 8bd309a..3da3334 100644 (file)
 
 static int mdc_cleanup(struct obd_device *obd, int flags);
 
-int mdc_get_secdesc_size(void)
-{
-#ifdef __KERNEL__
-        int ngroups = current_ngroups;
-
-        if (ngroups > LUSTRE_MAX_GROUPS)
-                ngroups = LUSTRE_MAX_GROUPS;
-
-        return sizeof(struct mds_req_sec_desc) +
-                sizeof(__u32) * ngroups;
-#else
-        return 0;
-#endif
-}
-
-/*
- * because group info might have changed since last time we call
- * get_secdesc_size(), so here we did more sanity check to prevent garbage gids
- */
-void mdc_pack_secdesc(struct ptlrpc_request *req, int size)
-{
-#ifdef __KERNEL__
-        struct mds_req_sec_desc *rsd;
-        
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,4)
-        struct group_info *ginfo;
-#endif
-
-        rsd = lustre_msg_buf(req->rq_reqmsg,
-                             MDS_REQ_SECDESC_OFF, size);
-        
-        rsd->rsd_uid = current->uid;
-        rsd->rsd_gid = current->gid;
-        rsd->rsd_fsuid = current->fsuid;
-        rsd->rsd_fsgid = current->fsgid;
-        rsd->rsd_cap = current->cap_effective;
-        rsd->rsd_ngroups = (size - sizeof(*rsd)) / sizeof(__u32);
-        LASSERT(rsd->rsd_ngroups <= LUSTRE_MAX_GROUPS);
-
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,4)
-        task_lock(current);
-        get_group_info(current->group_info);
-        ginfo = current->group_info;
-        task_unlock(current);
-        if (rsd->rsd_ngroups > ginfo->ngroups)
-                rsd->rsd_ngroups = ginfo->ngroups;
-        memcpy(rsd->rsd_groups, ginfo->blocks[0],
-               rsd->rsd_ngroups * sizeof(__u32));
-#else
-        LASSERT(rsd->rsd_ngroups <= NGROUPS);
-        if (rsd->rsd_ngroups > current->ngroups)
-                rsd->rsd_ngroups = current->ngroups;
-        memcpy(rsd->rsd_groups, current->groups,
-               rsd->rsd_ngroups * sizeof(__u32));
-#endif
-#endif
-}
-
 extern int mds_queue_req(struct ptlrpc_request *);
 /* Helper that implements most of mdc_getstatus and signal_completed_replay. */
 /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */
@@ -114,14 +56,14 @@ static int send_getstatus(struct obd_import *imp, struct lustre_id *rootid,
         int rc, size[2] = {0, sizeof(*body)};
         ENTRY;
 
-        //size[0] = mdc_get_secdesc_size();
+        //size[0] = lustre_secdesc_size();
 
         req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_GETSTATUS,
                               2, size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
-        //mdc_pack_secdesc(req, size[0]);
+        //lustre_pack_secdesc(req, size[0]);
 
         body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof (*body));
         req->rq_send_state = level;
@@ -233,7 +175,7 @@ int mdc_getattr(struct obd_export *exp, struct lustre_id *id,
         /* XXX do we need to make another request here?  We just did a getattr
          *     to do the lookup in the first place.
          */
-        size[0] = mdc_get_secdesc_size();
+        size[0] = lustre_secdesc_size();
 
         LASSERT((ea_name != NULL) == (ea_namelen != 0));
         if (valid & (OBD_MD_FLEA | OBD_MD_FLEALIST)) {
@@ -246,7 +188,7 @@ int mdc_getattr(struct obd_export *exp, struct lustre_id *id,
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
-        mdc_pack_secdesc(req, size[0]);
+        lustre_pack_secdesc(req, size[0]);
 
         body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof (*body));
         memcpy(&body->id1, id, sizeof(*id));
@@ -279,14 +221,14 @@ int mdc_getattr_lock(struct obd_export *exp, struct lustre_id *id,
         int rc, size[3] = {0, sizeof(*body), namelen};
         ENTRY;
 
-        size[0] = mdc_get_secdesc_size();
+        size[0] = lustre_secdesc_size();
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
                               MDS_GETATTR_LOCK, 3, size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
-        mdc_pack_secdesc(req, size[0]);
+        lustre_pack_secdesc(req, size[0]);
 
         body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof (*body));
         memcpy(&body->id1, id, sizeof(*id));
@@ -643,8 +585,8 @@ int mdc_close(struct obd_export *exp, struct obdo *oa,
         if (req == NULL)
                 GOTO(out, rc = -ENOMEM);
 
-        //reqsize[0] = mdc_get_secdesc_size();
-        //mdc_pack_secdesc(req, reqsize[0]);
+        //reqsize[0] = lustre_secdesc_size();
+        //lustre_pack_secdesc(req, reqsize[0]);
 
         /* Ensure that this close's handle is fixed up during replay. */
         LASSERT(och != NULL);
@@ -720,14 +662,14 @@ int mdc_done_writing(struct obd_export *exp, struct obdo *obdo)
         int rc, size[2] = {0, sizeof(*body)};
         ENTRY;
 
-        size[0] = mdc_get_secdesc_size();
+        size[0] = lustre_secdesc_size();
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
                               MDS_DONE_WRITING, 2, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        mdc_pack_secdesc(req, size[0]);
+        lustre_pack_secdesc(req, size[0]);
 
         body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, 
                               sizeof(*body));
@@ -761,7 +703,7 @@ int mdc_readpage(struct obd_export *exp,
 
         CDEBUG(D_INODE, "inode: %ld\n", (long)id->li_stc.u.e3s.l3s_ino);
 
-        size[0] = mdc_get_secdesc_size();
+        size[0] = lustre_secdesc_size();
 
         req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_READPAGE,
                               2, size, NULL);
@@ -770,7 +712,7 @@ int mdc_readpage(struct obd_export *exp,
         /* XXX FIXME bug 249 */
         req->rq_request_portal = MDS_READPAGE_PORTAL;
 
-        mdc_pack_secdesc(req, size[0]);
+        lustre_pack_secdesc(req, size[0]);
 
         desc = ptlrpc_prep_bulk_imp(req, 1, BULK_PUT_SINK, MDS_BULK_PORTAL);
         if (desc == NULL)
@@ -919,13 +861,6 @@ int mdc_set_info(struct obd_export *exp, obd_count keylen,
                 }
                 CERROR("unrecognized security type %s\n", (char*) val);
                 rc = -EINVAL;
-        } else if (keylen == strlen("nllu") && memcmp(key, "nllu", keylen) == 0) {
-                struct client_obd *cli = &exp->exp_obd->u.cli;
-
-                LASSERT(vallen == sizeof(__u32) * 2);
-                cli->cl_nllu = ((__u32 *) val)[0];
-                cli->cl_nllg = ((__u32 *) val)[1];
-                RETURN(0);
         } else if (keylen == strlen("async") && memcmp(key, "async", keylen) == 0) {
                 struct client_obd *cl = &exp->exp_obd->u.cli;
                 if (vallen != sizeof(int))
@@ -935,7 +870,6 @@ int mdc_set_info(struct obd_export *exp, obd_count keylen,
                        exp->exp_obd->obd_name, cl->cl_async);
                 RETURN(0);
         }
-
         RETURN(rc);
 }
 
@@ -993,14 +927,14 @@ static int mdc_pin(struct obd_export *exp, obd_id ino, __u32 gen, int type,
         int rc, size[2] = {0, sizeof(*body)};
         ENTRY;
 
-        //size[0] = mdc_get_secdesc_size();
+        //size[0] = lustre_secdesc_size();
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
                               MDS_PIN, 2, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        //mdc_pack_secdesc(req, size[0]);
+        //lustre_pack_secdesc(req, size[0]);
 
         body = lustre_msg_buf(req->rq_reqmsg, 
                               MDS_REQ_REC_OFF, sizeof(*body));
@@ -1050,14 +984,14 @@ static int mdc_unpin(struct obd_export *exp,
         if (handle->och_magic != OBD_CLIENT_HANDLE_MAGIC)
                 RETURN(0);
 
-        //size[0] = mdc_get_secdesc_size();
+        //size[0] = lustre_secdesc_size();
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
                               MDS_CLOSE, 2, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        //mdc_pack_secdesc(req, size[0]);
+        //lustre_pack_secdesc(req, size[0]);
 
         body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof(*body));
         memcpy(&body->handle, &handle->och_fh, sizeof(body->handle));
@@ -1086,14 +1020,14 @@ int mdc_sync(struct obd_export *exp, struct lustre_id *id,
         int rc;
         ENTRY;
 
-        //size[0] = mdc_get_secdesc_size();
+        //size[0] = lustre_secdesc_size();
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
                               MDS_SYNC, 2, size, NULL);
         if (!req)
                 RETURN(rc = -ENOMEM);
 
-        //mdc_pack_secdesc(req, size[0]);
+        //lustre_pack_secdesc(req, size[0]);
 
         if (id) {
                 body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF,
@@ -1288,6 +1222,33 @@ static int mdc_get_info(struct obd_export *exp, obd_count keylen,
         if (!valsize || !val)
                 RETURN(-EFAULT);
 
+        if (keylen == strlen("remote_flag") && !strcmp(key, "remote_flag")) {
+                struct obd_import *imp;
+                struct obd_connect_data *data;
+
+                imp = class_exp2cliimp(exp);
+                if (!imp) {
+                        LBUG();
+                        RETURN(-EINVAL);
+                }
+
+                if (imp->imp_state != LUSTRE_IMP_FULL) {
+                        CERROR("import state not full\n");
+                        RETURN(-EINVAL);
+                }
+
+                data = &imp->imp_connect_data;
+                if (data->ocd_connect_flags & OBD_CONNECT_REMOTE) {
+                        *((int *)val) = 1;
+                        RETURN(0);
+                } else if (data->ocd_connect_flags & OBD_CONNECT_LOCAL) {
+                        *((int *)val) = 0;
+                        RETURN(0);
+                }
+                CERROR("no remote flag set?\n");
+                RETURN(-EINVAL);
+        }
+
         if ((keylen < strlen("mdsize") || strcmp(key, "mdsize") != 0) &&
             (keylen < strlen("mdsnum") || strcmp(key, "mdsnum") != 0) &&
             (keylen < strlen("rootid") || strcmp(key, "rootid") != 0))
index 3e8e70b..6d7f7e3 100644 (file)
@@ -55,7 +55,6 @@
 #include <linux/lustre_fsfilt.h>
 #include <linux/lprocfs_status.h>
 #include <linux/lustre_commit_confd.h>
-
 #include <linux/lustre_acl.h>
 #include "mds_internal.h"
 
@@ -360,6 +359,145 @@ struct dentry *mds_id2dentry(struct obd_device *obd, struct lustre_id *id,
         RETURN(result);
 }
 
+static
+int mds_req_add_idmapping(struct ptlrpc_request *req,
+                          struct mds_export_data *med)
+{
+        struct mds_req_sec_desc *rsd;
+        struct lustre_sec_desc  *lsd;
+        int rc;
+
+        if (!med->med_remote)
+                return 0;
+
+        /* maybe we should do it more completely: invalidate the gss ctxt? */
+        if (req->rq_mapped_uid == MDS_IDMAP_NOTFOUND) {
+                CWARN("didn't find mapped uid\n");
+                return -EPERM;
+        }
+
+        rsd = lustre_swab_mds_secdesc(req, MDS_REQ_SECDESC_OFF);
+        if (!rsd) {
+                CERROR("Can't unpack security desc\n");
+                return -EPROTO;
+        }
+
+        lsd = mds_get_lsd(req->rq_mapped_uid);
+        if (!lsd) {
+                CERROR("can't get LSD(%u), no mapping added\n",
+                       req->rq_mapped_uid);
+                return -EPERM;
+        }
+
+        rc = mds_idmap_add(med->med_idmap, rsd->rsd_uid, lsd->lsd_uid,
+                           rsd->rsd_gid, lsd->lsd_gid);
+        mds_put_lsd(lsd);
+        return rc;
+}
+
+static
+int mds_req_del_idmapping(struct ptlrpc_request *req,
+                          struct mds_export_data *med)
+{
+        struct mds_req_sec_desc *rsd;
+        struct lustre_sec_desc  *lsd;
+        int rc;
+
+        if (!med->med_remote)
+                return 0;
+
+        rsd = lustre_swab_mds_secdesc(req, MDS_REQ_SECDESC_OFF);
+        if (!rsd) {
+                CERROR("Can't unpack security desc\n");
+                return -EPROTO;
+        }
+
+        LASSERT(req->rq_mapped_uid != -1);
+        lsd = mds_get_lsd(req->rq_mapped_uid);
+        if (!lsd) {
+                CERROR("can't get LSD(%u), no idmapping deleted\n",
+                       req->rq_mapped_uid);
+                return -EPERM;
+        }
+
+        rc = mds_idmap_del(med->med_idmap, rsd->rsd_uid, lsd->lsd_uid,
+                           rsd->rsd_gid, lsd->lsd_gid);
+        mds_put_lsd(lsd);
+        return rc;
+}
+
+static int mds_init_export_data(struct ptlrpc_request *req,
+                                struct mds_export_data *med)
+{
+        struct obd_connect_data *data, *reply;
+        int ask_remote, ask_local;
+        ENTRY;
+
+        data = lustre_msg_buf(req->rq_reqmsg, 5, sizeof(*data));
+        reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*data));
+        LASSERT(data && reply);
+
+        if (med->med_initialized) {
+                CWARN("med already initialized, reconnect?\n");
+                goto reply;
+        }
+
+        ask_remote = data->ocd_connect_flags & OBD_CONNECT_REMOTE;
+        ask_local = data->ocd_connect_flags & OBD_CONNECT_LOCAL;
+
+        /* currently the policy is simple: satisfy client as possible
+         * as we can.
+         */
+        if (req->rq_auth_uid == -1) {
+                if (ask_remote)
+                        CWARN("null sec is used, force to be local\n");
+                med->med_remote = 0;
+        } else {
+                if (ask_remote) {
+                        if (!req->rq_remote_realm)
+                                CWARN("local realm asked to be remote\n");
+                        med->med_remote = 1;
+                } else if (ask_local) {
+                        if (req->rq_remote_realm)
+                                CWARN("remote realm asked to be local\n");
+                        med->med_remote = 0;
+                } else
+                        med->med_remote = (req->rq_remote_realm != 0);
+        }
+
+        med->med_nllu = data->ocd_nllu[0];
+        med->med_nllg = data->ocd_nllu[1];
+
+        med->med_initialized = 1;
+reply:
+        reply->ocd_connect_flags &= ~(OBD_CONNECT_REMOTE | OBD_CONNECT_LOCAL);
+        if (med->med_remote) {
+                if (!med->med_idmap)
+                        med->med_idmap = mds_idmap_alloc();
+
+                if (!med->med_idmap)
+                        CERROR("Failed to alloc idmap, following request from "
+                               "this client will be refused\n");
+
+                reply->ocd_connect_flags |= OBD_CONNECT_REMOTE;
+                CDEBUG(D_SEC, "set client as remote\n");
+        } else {
+                reply->ocd_connect_flags |= OBD_CONNECT_LOCAL;
+                CDEBUG(D_SEC, "set client as local\n");
+        }
+
+        RETURN(0);
+}
+
+static void mds_free_export_data(struct mds_export_data *med)
+{
+        if (!med->med_idmap)
+                return;
+
+        LASSERT(med->med_remote);
+        mds_idmap_free(med->med_idmap);
+        med->med_idmap = NULL;
+}
 
 /* Establish a connection to the MDS.
  *
@@ -368,7 +506,8 @@ struct dentry *mds_id2dentry(struct obd_device *obd, struct lustre_id *id,
  * etc.
  */
 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
-                       struct obd_uuid *cluuid, unsigned long flags)
+                       struct obd_uuid *cluuid, struct obd_connect_data *data,
+                       unsigned long flags)
 {
         struct mds_export_data *med;
         struct mds_client_data *mcd;
@@ -456,12 +595,12 @@ static int mds_init_export(struct obd_export *exp)
 static int mds_destroy_export(struct obd_export *export)
 {
         struct obd_device *obd = export->exp_obd;
-        struct mds_export_data *med;
+        struct mds_export_data *med = &export->exp_mds_data;
         struct lvfs_run_ctxt saved;
         int rc = 0;
         ENTRY;
 
-        med = &export->exp_mds_data;
+        mds_free_export_data(med);
         target_destroy_export(export);
 
         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
@@ -880,31 +1019,46 @@ int mds_pack_acl(struct obd_device *obd, struct lustre_msg *repmsg, int offset,
 }
 
 /* 
- * we only take care of fsuid/fsgid.
+ * here we take simple rule: once uid/fsuid is root, we also squash
+ * the gid/fsgid, don't care setuid/setgid attributes.
  */
-void mds_squash_root(struct mds_obd *mds, struct mds_req_sec_desc *rsd,
-                     ptl_nid_t *peernid)
+int mds_squash_root(struct mds_obd *mds, struct mds_req_sec_desc *rsd,
+                    ptl_nid_t *peernid)
 {
-        if (!mds->mds_squash_uid || rsd->rsd_fsuid)
-                return;
+        if (!mds->mds_squash_uid || *peernid == mds->mds_nosquash_nid)
+                return 0;
 
-        if (*peernid == mds->mds_nosquash_nid)
-                return;
+        if (rsd->rsd_uid && rsd->rsd_fsuid)
+                return 0;
 
-        CDEBUG(D_SEC, "squash req from 0x%llx, (%d:%d/%x)=>(%d:%d/%x)\n",
-                *peernid, rsd->rsd_fsuid, rsd->rsd_fsgid, rsd->rsd_cap,
-                mds->mds_squash_uid, mds->mds_squash_gid,
-                (rsd->rsd_cap & ~CAP_FS_MASK));
+        CDEBUG(D_SEC, "squash req from "LPX64":"
+               "(%u:%u-%u:%u/%x)=>(%u:%u-%u:%u/%x)\n", *peernid,
+                rsd->rsd_uid, rsd->rsd_gid,
+                rsd->rsd_fsuid, rsd->rsd_fsgid, rsd->rsd_cap,
+                rsd->rsd_uid ? rsd->rsd_uid : mds->mds_squash_uid,
+                rsd->rsd_uid ? rsd->rsd_gid : mds->mds_squash_gid,
+                rsd->rsd_fsuid ? rsd->rsd_fsuid : mds->mds_squash_uid,
+                rsd->rsd_fsuid ? rsd->rsd_fsgid : mds->mds_squash_gid,
+                rsd->rsd_cap & ~CAP_FS_MASK);
 
-        rsd->rsd_fsuid = mds->mds_squash_uid;
-        rsd->rsd_fsgid = mds->mds_squash_gid;
+        if (rsd->rsd_uid == 0) {
+                rsd->rsd_uid = mds->mds_squash_uid;
+                rsd->rsd_gid = mds->mds_squash_gid;
+        }
+        if (rsd->rsd_fsuid == 0) {
+                rsd->rsd_fsuid = mds->mds_squash_uid;
+                rsd->rsd_fsgid = mds->mds_squash_gid;
+        }
         rsd->rsd_cap &= ~CAP_FS_MASK;
+
+        return 1;
 }
 
 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
                                 struct ptlrpc_request *req, int req_off,
                                 struct mds_body *reqbody, int reply_off)
 {
+        struct mds_export_data *med = &req->rq_export->u.eu_mds_data;
         struct inode *inode = dentry->d_inode;
         struct mds_body *body;
         int rc = 0;
@@ -952,9 +1106,8 @@ static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
                                   body, inode);
         }                
 
-        /* do reverse uid/gid mapping if needed */
-        if (rc == 0 && req->rq_remote)
-                mds_reverse_map_ugid(req, body);
+        if (rc == 0)
+                mds_body_do_reverse_map(med, body);
 
         RETURN(rc);
 }
@@ -1167,7 +1320,6 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
 
         rc = mds_init_ucred(&uc, req, rsd);
         if (rc) {
-                CERROR("can't init ucred\n");
                 GOTO(cleanup, rc);
         }
 
@@ -1376,7 +1528,6 @@ static int mds_getattr(struct ptlrpc_request *req, int offset)
         rc = mds_init_ucred(&uc, req, rsd);
         if (rc) {
                 mds_exit_ucred(&uc);
-                CERROR("can't init ucred\n");
                 RETURN(rc);
         }
 
@@ -1539,7 +1690,6 @@ static int mds_readpage(struct ptlrpc_request *req, int offset)
 
         rc = mds_init_ucred(&uc, req, rsd);
         if (rc) {
-                CERROR("can't init ucred\n");
                 GOTO(out, rc);
         }
 
@@ -1702,29 +1852,12 @@ int mds_reint(struct ptlrpc_request *req, int offset,
 
         rc = mds_init_ucred(&rec->ur_uc, req, rsd);
         if (rc) {
-                CERROR("can't init ucred\n");
                 GOTO(out, rc);
         }
 
         /* rc will be used to interrupt a for loop over multiple records */
         rc = mds_reint_rec(rec, offset, req, lockh);
 
-        /* do reverse uid/gid mapping if needed */
-        if (rc == 0 && req->rq_remote &&
-            (rec->ur_opcode == REINT_SETATTR ||
-             rec->ur_opcode == REINT_OPEN)) {
-                struct mds_body *body;
-                int bodyoff;
-
-                if (rec->ur_opcode == REINT_SETATTR)
-                        bodyoff = 0;
-                else /* open */
-                        bodyoff = (offset == 3 ? 1 : 0);
-                body = lustre_msg_buf(req->rq_repmsg, bodyoff, sizeof(*body));
-                LASSERT(body);
-
-                mds_reverse_map_ugid(req, body);
-        }
  out:
         mds_exit_ucred(&rec->ur_uc);
         OBD_FREE(rec, sizeof(*rec));
@@ -1865,13 +1998,10 @@ static int mdt_obj_create(struct ptlrpc_request *req)
 
         MDS_CHECK_RESENT(req, reconstruct_create(req));
 
-        /*
-         * this only serve to inter-mds request, don't need check group database
-         * here. --ericm.
-         */
         uc.luc_lsd = NULL;
         uc.luc_ginfo = NULL;
         uc.luc_uid = body->oa.o_uid;
+        uc.luc_gid = body->oa.o_gid;
         uc.luc_fsuid = body->oa.o_uid;
         uc.luc_fsgid = body->oa.o_gid;
 
@@ -2224,36 +2354,41 @@ static int mdt_set_info(struct ptlrpc_request *req)
         RETURN(-EINVAL);
 }
 
-static int mds_init_export_data(struct ptlrpc_request *req)
+static void mds_revoke_export_locks(struct obd_export *exp)
 {
-        struct mds_export_data *med = &req->rq_export->u.eu_mds_data;
-        __u32 *nllu;
+        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+        struct list_head *locklist = &exp->exp_ldlm_data.led_held_locks;
+        struct ldlm_lock *lock, *next;
+        struct ldlm_lock_desc desc;
 
-        nllu = lustre_msg_buf(req->rq_reqmsg, 4, sizeof(__u32) * 2);
-        if (nllu == NULL) {
-                CERROR("failed to extract nllu, use 99:99\n");
-                med->med_nllu = 99;
-                med->med_nllg = 99;
-        } else {
-                if (lustre_msg_swabbed(req->rq_reqmsg)) {
-                        __swab32s(&nllu[0]);
-                        __swab32s(&nllu[1]);
-                }
-                med->med_nllu = nllu[0];
-                med->med_nllg = nllu[1];
-        }
+        if (!exp->u.eu_mds_data.med_remote)
+                return;
 
-        if (req->rq_remote) {
-                CWARN("exp %p, peer "LPX64": set as remote\n",
-                       req->rq_export, req->rq_peer.peer_id.nid);
-                med->med_local = 0;
-        } else
-                med->med_local = 1;
+        ENTRY;
+        l_lock(&ns->ns_lock);
+        list_for_each_entry_safe(lock, next, locklist, l_export_chain) {
+                if (lock->l_req_mode != lock->l_granted_mode)
+                        continue;
 
-        LASSERT(med->med_idmap == NULL);
-        spin_lock_init(&med->med_idmap_lock);
+                LASSERT(lock->l_resource);
+                if (lock->l_resource->lr_type != LDLM_IBITS &&
+                    lock->l_resource->lr_type != LDLM_PLAIN)
+                        continue;
 
-        return 0;
+                if (lock->l_flags & LDLM_FL_AST_SENT)
+                        continue;
+
+                lock->l_flags |= LDLM_FL_AST_SENT;
+
+                /* the desc just pretend to exclusive */
+                ldlm_lock2desc(lock, &desc);
+                desc.l_req_mode = LCK_EX;
+                desc.l_granted_mode = 0;
+
+                lock->l_blocking_ast(lock, &desc, NULL, LDLM_CB_BLOCKING);
+        }
+        l_unlock(&ns->ns_lock);
+        EXIT;
 }
 
 static int mds_msg_check_version(struct lustre_msg *msg)
@@ -2346,8 +2481,19 @@ int mds_handle(struct ptlrpc_request *req)
 
         /* Security opc should NOT trigger any recovery events */
         if (req->rq_reqmsg->opc == SEC_INIT ||
-            req->rq_reqmsg->opc == SEC_INIT_CONTINUE ||
-            req->rq_reqmsg->opc == SEC_FINI) {
+            req->rq_reqmsg->opc == SEC_INIT_CONTINUE) {
+                if (!req->rq_export)
+                        GOTO(out, rc = 0);
+
+                mds_req_add_idmapping(req, &req->rq_export->exp_mds_data);
+                mds_revoke_export_locks(req->rq_export);
+                GOTO(out, rc = 0);
+        } else if (req->rq_reqmsg->opc == SEC_FINI) {
+                if (!req->rq_export)
+                        GOTO(out, rc = 0);
+
+                mds_req_del_idmapping(req, &req->rq_export->exp_mds_data);
+                mds_revoke_export_locks(req->rq_export);
                 GOTO(out, rc = 0);
         }
 
@@ -2408,9 +2554,16 @@ int mds_handle(struct ptlrpc_request *req)
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
                 rc = target_handle_connect(req);
                 if (!rc) {
+                        struct mds_export_data *med;
+
+                        LASSERT(req->rq_export);
+                        med = &req->rq_export->u.eu_mds_data;
+                        mds_init_export_data(req, med);
+                        mds_req_add_idmapping(req, med);
+
                         /* Now that we have an export, set mds. */
+                        obd = req->rq_export->exp_obd;
                         mds = mds_req2mds(req);
-                        mds_init_export_data(req);
                 }
                 break;
 
@@ -2953,7 +3106,7 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
          * here we use "iopen_nopriv" hardcoded, because it affects MDS utility
          * and the rest of options are passed by mount options. Probably this
          * should be moved to somewhere else like startup scripts or lconf. */
-        sprintf(options, "iopen_nopriv,acl,user_xattr");
+        sprintf(options, "iopen_nopriv");
         if (lcfg->lcfg_inllen4 > 0 && lcfg->lcfg_inlbuf4)
                 sprintf(options + strlen(options), ",%s",
                         lcfg->lcfg_inlbuf4);
index 289b9ad..f658434 100644 (file)
@@ -166,11 +166,11 @@ static int lprocfs_wr_lsd_downcall(struct file *file, const char *buffer,
 
         if (count != sizeof(param)) {
                 CERROR("invalid data size %lu\n", count);
-                return count;
+                goto do_err_downcall;
         }
         if (copy_from_user(&param, buffer, count)) {
                 CERROR("broken downcall\n");
-                return count;
+                goto do_err_downcall;
         }
 
         if (param.err) {
@@ -179,9 +179,8 @@ static int lprocfs_wr_lsd_downcall(struct file *file, const char *buffer,
         }
 
         if (param.ngroups > NGROUPS_MAX) {
-                CERROR("%d groups?\n", param.ngroups);
-                param.err = -EINVAL;
-                goto do_downcall;
+                CERROR("%d groups too big\n", param.ngroups);
+                goto do_err_downcall;
         }
 
         if (param.ngroups <= NGROUPS_SMALL)
@@ -191,25 +190,27 @@ static int lprocfs_wr_lsd_downcall(struct file *file, const char *buffer,
                 if (!gids) {
                         CERROR("fail to alloc memory for %d gids\n",
                                 param.ngroups);
-                        param.err = -ENOMEM;
-                        goto do_downcall;
+                        goto do_err_downcall;
                 }
         }
         if (copy_from_user(gids, param.groups,
                            param.ngroups * sizeof(gid_t))) {
                 CERROR("broken downcall\n");
-                param.err = -EFAULT;
-                goto do_downcall;
+                goto do_err_downcall;
         }
 
         param.groups = gids;
 
 do_downcall:
-        upcall_cache_downcall(cache, (__u64) param.uid, param.err, &param);
+        upcall_cache_downcall(cache, (__u64) param.uid, &param);
 
         if (gids && gids != gids_local)
                 OBD_FREE(gids, param.ngroups * sizeof(gid_t));
         return count;
+
+do_err_downcall:
+        param.err = -EINVAL;
+        goto do_downcall;
 }
 
 static int lprocfs_rd_lsd_expire(char *page, char **start, off_t off, int count,
index 20c0145..391654a 100644 (file)
@@ -130,8 +130,6 @@ int mds_client_free(struct obd_export *exp, int clear_client)
         struct lvfs_run_ctxt saved;
         int rc;
 
-        mds_idmap_cleanup(med);
-
         if (!med->med_mcd)
                 RETURN(0);
 
index 98271df..2dc75a7 100644 (file)
@@ -99,13 +99,18 @@ int mds_lock_new_child(struct obd_device *obd, struct inode *inode,
 void groups_from_buffer(struct group_info *ginfo, __u32 *gids);
 int mds_update_unpack(struct ptlrpc_request *, int offset,
                       struct mds_update_record *);
-int mds_idmap_set(struct mds_export_data *med, __u32 id1, __u32 id2,
-                  int is_uid_mapping);
-__u32 mds_idmap_get(struct mds_export_data *med, __u32 id,
-                    int is_uid_mapping);
-void mds_idmap_cleanup(struct mds_export_data *med);
-void mds_reverse_map_ugid(struct ptlrpc_request *req,
-                          struct mds_body *body);
+int mds_idmap_add(struct mds_idmap_table *tbl,
+                  uid_t rmt_uid, uid_t lcl_uid,
+                  gid_t rmt_gid, gid_t lcl_gid);
+int mds_idmap_del(struct mds_idmap_table *tbl,
+                  uid_t rmt_uid, uid_t lcl_uid,
+                  gid_t rmt_gid, gid_t lcl_gid);
+int mds_idmap_lookup_uid(struct mds_idmap_table *tbl, int reverse, uid_t uid);
+int mds_idmap_lookup_gid(struct mds_idmap_table *tbl, int reverse, gid_t gid);
+struct mds_idmap_table *mds_idmap_alloc(void);
+void mds_idmap_free(struct mds_idmap_table *tbl);
+void mds_body_do_reverse_map(struct mds_export_data *med,
+                             struct mds_body *body);
 int mds_init_ucred(struct lvfs_ucred *ucred, struct ptlrpc_request *req,
                    struct mds_req_sec_desc *rsd);
 void mds_exit_ucred(struct lvfs_ucred *ucred);
@@ -165,8 +170,8 @@ int mds_obd_destroy(struct obd_export *exp, struct obdo *oa,
                     struct lov_stripe_md *ea, struct obd_trans_info *oti);
 
 /* mds/handler.c */
-void mds_squash_root(struct mds_obd *mds, struct mds_req_sec_desc *rsd,
-                     ptl_nid_t *peernid);
+int mds_squash_root(struct mds_obd *mds, struct mds_req_sec_desc *rsd,
+                    ptl_nid_t *peernid);
 int mds_handle(struct ptlrpc_request *req);
 extern struct lvfs_callback_ops mds_lvfs_ops;
 int mds_dt_clean(struct obd_device *obd);
index cdc1425..a073ec7 100644 (file)
@@ -47,6 +47,7 @@
 
 #include <linux/obd_support.h>
 #include <linux/lustre_lib.h>
+#include <linux/lustre_ucache.h>
 #include "mds_internal.h"
 
 #if LINUX_VERSION_CODE < KERNEL_VERSION(2,6,4)
@@ -508,195 +509,347 @@ int mds_update_unpack(struct ptlrpc_request *req, int offset,
         RETURN(rc);
 }
 
+/********************************
+ * MDS uid/gid mapping handling *
+ ********************************/
+
 static
-struct mds_idmap_table *__get_idmap_table(struct mds_export_data *med,
-                                          int create)
+struct mds_idmap_entry* idmap_alloc_entry(__u32 rmt_id, __u32 lcl_id)
 {
-        struct mds_idmap_table *new;
-        int i;
+        struct mds_idmap_entry *e;
 
-        if (!create || med->med_idmap)
-                return med->med_idmap;
+        OBD_ALLOC(e, sizeof(*e));
+        if (!e)
+                return NULL;
 
-        spin_unlock(&med->med_idmap_lock);
-        OBD_ALLOC(new, sizeof(*new));
-        spin_lock(&med->med_idmap_lock);
+        INIT_LIST_HEAD(&e->rmt_hash);
+        INIT_LIST_HEAD(&e->lcl_hash);
+        atomic_set(&e->refcount, 1);
+        e->rmt_id = rmt_id;
+        e->lcl_id = lcl_id;
 
-        if (!new) {
-                CERROR("fail to alloc %d\n", sizeof(*new));
-                return NULL;
-        }
+        return e;
+}
 
-        if (med->med_idmap) {
-                OBD_FREE(new, sizeof(*new));
-                return med->med_idmap;
-        }
+void idmap_free_entry(struct mds_idmap_entry *e)
+{
+        if (!list_empty(&e->rmt_hash))
+                list_del(&e->rmt_hash);
+        if (!list_empty(&e->lcl_hash))
+                list_del(&e->lcl_hash);
+        OBD_FREE(e, sizeof(*e));
+}
 
-        for (i = 0; i < MDS_IDMAP_HASHSIZE; i++) {
-                INIT_LIST_HEAD(&new->uidmap[i]);
-                INIT_LIST_HEAD(&new->gidmap[i]);
+static
+int idmap_insert_entry(struct list_head *rmt_hash, struct list_head *lcl_hash,
+                       struct mds_idmap_entry *new, const char *warn_msg)
+{
+        struct list_head *rmt_head = &rmt_hash[MDS_IDMAP_HASHFUNC(new->rmt_id)];
+        struct list_head *lcl_head = &lcl_hash[MDS_IDMAP_HASHFUNC(new->lcl_id)];
+        struct mds_idmap_entry *e;
+
+        list_for_each_entry(e, rmt_head, rmt_hash) {
+                if (e->rmt_id == new->rmt_id &&
+                    e->lcl_id == new->lcl_id) {
+                        atomic_inc(&e->refcount);
+                        return 1;
+                }
+                if (e->rmt_id == new->rmt_id && warn_msg)
+                        CWARN("%s: rmt id %u already map to %u (new %u)\n",
+                              warn_msg, e->rmt_id, e->lcl_id, new->lcl_id);
+                if (e->lcl_id == new->lcl_id && warn_msg)
+                        CWARN("%s: lcl id %u already be mapped from %u "
+                              "(new %u)\n", warn_msg,
+                              e->lcl_id, e->rmt_id, new->rmt_id);
         }
 
-        CDEBUG(D_SEC, "allocate idmap table for med %p\n", med);
-        med->med_idmap = new;
-        return new;
+        list_add_tail(rmt_head, &new->rmt_hash);
+        list_add_tail(lcl_head, &new->lcl_hash);
+        return 0;
 }
 
-static void __flush_mapping_table(struct list_head *table)
+static
+int idmap_remove_entry(struct list_head *rmt_hash, struct list_head *lcl_hash,
+                       __u32 rmt_id, __u32 lcl_id)
 {
-        struct mds_idmap_item *item;
-        int i;
-
-        for (i = 0; i < MDS_IDMAP_HASHSIZE; i++) {
-                while (!list_empty(&table[i])) {
-                        item = list_entry(table[i].next, struct mds_idmap_item,
-                                          hash);
-                        list_del(&item->hash);
-                        OBD_FREE(item, sizeof(*item));
+        struct list_head *rmt_head = &rmt_hash[MDS_IDMAP_HASHFUNC(rmt_id)];
+        struct mds_idmap_entry *e;
+
+        list_for_each_entry(e, rmt_head, rmt_hash) {
+                if (e->rmt_id == rmt_id && e->lcl_id == lcl_id) {
+                        if (atomic_dec_and_test(&e->refcount)) {
+                                list_del(&e->rmt_hash);
+                                list_del(&e->lcl_hash);
+                                OBD_FREE(e, sizeof(*e));
+                                return 0;
+                        } else
+                                return 1;
                 }
         }
+        return -ENOENT;
 }
 
-void mds_idmap_cleanup(struct mds_export_data *med)
+int mds_idmap_add(struct mds_idmap_table *tbl,
+                  uid_t rmt_uid, uid_t lcl_uid,
+                  gid_t rmt_gid, gid_t lcl_gid)
 {
+        struct mds_idmap_entry *ue, *ge;
         ENTRY;
 
-        if (!med->med_idmap) {
-                EXIT;
-                return;
+        if (!tbl)
+                RETURN(-EPERM);
+
+        ue = idmap_alloc_entry(rmt_uid, lcl_uid);
+        if (!ue)
+                RETURN(-ENOMEM);
+        ge = idmap_alloc_entry(rmt_gid, lcl_gid);
+        if (!ge) {
+                idmap_free_entry(ue);
+                RETURN(-ENOMEM);
+        }
+
+        spin_lock(&tbl->mit_lock);
+
+        if (idmap_insert_entry(tbl->mit_idmaps[MDS_RMT_UIDMAP_IDX],
+                               tbl->mit_idmaps[MDS_LCL_UIDMAP_IDX],
+                               ue, "UID mapping")) {
+                idmap_free_entry(ue);
+        }
+
+        if (idmap_insert_entry(tbl->mit_idmaps[MDS_RMT_GIDMAP_IDX],
+                               tbl->mit_idmaps[MDS_LCL_GIDMAP_IDX],
+                               ge, "GID mapping")) {
+                idmap_free_entry(ge);
         }
 
-        spin_lock(&med->med_idmap_lock);
-        __flush_mapping_table(med->med_idmap->uidmap);
-        __flush_mapping_table(med->med_idmap->gidmap);
-        OBD_FREE(med->med_idmap, sizeof(struct mds_idmap_table));
-        spin_unlock(&med->med_idmap_lock);
+        spin_unlock(&tbl->mit_lock);
+        RETURN(0);
 }
 
-static inline int idmap_hash(__u32 id)
+int mds_idmap_del(struct mds_idmap_table *tbl,
+                  uid_t rmt_uid, uid_t lcl_uid,
+                  gid_t rmt_gid, gid_t lcl_gid)
 {
-        return (id & (MDS_IDMAP_HASHSIZE - 1));
+        ENTRY;
+
+        if (!tbl)
+                RETURN(0);
+
+        spin_lock(&tbl->mit_lock);
+        idmap_remove_entry(tbl->mit_idmaps[MDS_RMT_UIDMAP_IDX],
+                           tbl->mit_idmaps[MDS_LCL_UIDMAP_IDX],
+                           rmt_uid, lcl_uid);
+        idmap_remove_entry(tbl->mit_idmaps[MDS_RMT_GIDMAP_IDX],
+                           tbl->mit_idmaps[MDS_LCL_GIDMAP_IDX],
+                           rmt_gid, lcl_gid);
+        spin_unlock(&tbl->mit_lock);
+        RETURN(0);
 }
 
 static
-int __idmap_set_item(struct mds_export_data *med,
-                     struct list_head *table,
-                     __u32 id1, __u32 id2)
-{
-        struct list_head *head;
-        struct mds_idmap_item *item, *new = NULL;
-        int found = 0;
-
-        head = table + idmap_hash(id1);
-again:
-        list_for_each_entry(item, head, hash) {
-                if (item->id1 == id1) {
-                        found = 1;
-                        break;
-                }
-        }
+__u32 idmap_lookup_id(struct list_head *hash, int reverse, __u32 id)
+{
+        struct list_head *head = &hash[MDS_IDMAP_HASHFUNC(id)];
+        struct mds_idmap_entry *e;
 
-        if (!found) {
-                if (new == NULL) {
-                        spin_unlock(&med->med_idmap_lock);
-                        OBD_ALLOC(new, sizeof(*new));
-                        spin_lock(&med->med_idmap_lock);
-                        if (!new) {
-                                CERROR("fail to alloc %d\n", sizeof(*new));
-                                return -ENOMEM;
-                        }
-                        goto again;
+        if (!reverse) {
+                list_for_each_entry(e, head, rmt_hash) {
+                        if (e->rmt_id == id)
+                                return e->lcl_id;
                 }
-                new->id1 = id1;
-                new->id2 = id2;
-                list_add(&new->hash, head);
+                return MDS_IDMAP_NOTFOUND;
         } else {
-                if (new)
-                        OBD_FREE(new, sizeof(*new));
-                if (item->id2 != id2) {
-                        CWARN("mapping changed: %u ==> (%u -> %u)\n",
-                               id1, item->id2, id2);
-                        item->id2 = id2;
+                list_for_each_entry(e, head, lcl_hash) {
+                        if (e->lcl_id == id)
+                                return e->rmt_id;
                 }
-                list_move(&item->hash, head);
+                return MDS_IDMAP_NOTFOUND;
         }
-
-        return 0;
 }
 
-int mds_idmap_set(struct mds_export_data *med, __u32 id1, __u32 id2,
-                  int is_uid_mapping)
+int mds_idmap_lookup_uid(struct mds_idmap_table *tbl, int reverse, uid_t uid)
 {
-        struct mds_idmap_table *idmap;
-        int rc;
-        ENTRY;
+        struct list_head *hash;
+
+        if (!tbl)
+                return MDS_IDMAP_NOTFOUND;
+
+        if (!reverse)
+                hash = tbl->mit_idmaps[MDS_RMT_UIDMAP_IDX];
+        else
+                hash = tbl->mit_idmaps[MDS_LCL_UIDMAP_IDX];
+
+        spin_lock(&tbl->mit_lock);
+        uid = idmap_lookup_id(hash, reverse, uid);
+        spin_unlock(&tbl->mit_lock);
 
-        spin_lock(&med->med_idmap_lock);
+        return uid;
+}
+
+int mds_idmap_lookup_gid(struct mds_idmap_table *tbl, int reverse, gid_t gid)
+{
+        struct list_head *hash;
 
-        idmap = __get_idmap_table(med, 1);
-        if (!idmap)
-                GOTO(out, rc = -ENOMEM);
+        if (!tbl)
+                return MDS_IDMAP_NOTFOUND;
 
-        if (is_uid_mapping)
-                rc = __idmap_set_item(med, idmap->uidmap, id1, id2);
+        if (!reverse)
+                hash = tbl->mit_idmaps[MDS_RMT_GIDMAP_IDX];
         else
-                rc = __idmap_set_item(med, idmap->gidmap, id1, id2);
+                hash = tbl->mit_idmaps[MDS_LCL_GIDMAP_IDX];
 
-out:
-        spin_unlock(&med->med_idmap_lock);
-        RETURN(rc);
+        spin_lock(&tbl->mit_lock);
+        gid = idmap_lookup_id(hash, reverse, gid);
+        spin_unlock(&tbl->mit_lock);
+
+        return gid;
 }
 
-__u32 mds_idmap_get(struct mds_export_data *med, __u32 id,
-                    int is_uid_mapping)
+struct mds_idmap_table *mds_idmap_alloc()
 {
-        struct mds_idmap_table *idmap;
-        struct list_head *table;
-        struct list_head *head;
-        struct mds_idmap_item *item;
-        int found = 0;
-        __u32 res;
+        struct mds_idmap_table *tbl;
+        int i, j;
 
-        spin_lock(&med->med_idmap_lock);
-        idmap = __get_idmap_table(med, 0);
-        if (!idmap)
-                goto nllu;
+        OBD_ALLOC(tbl, sizeof(*tbl));
+        if (!tbl)
+                return NULL;
 
-        table = is_uid_mapping ? idmap->uidmap : idmap->gidmap;
-        head = table + idmap_hash(id);
+        spin_lock_init(&tbl->mit_lock);
+        for (i = 0; i < MDS_IDMAP_N_HASHES; i++)
+                for (j = 0; j < MDS_IDMAP_HASHSIZE; j++)
+                        INIT_LIST_HEAD(&tbl->mit_idmaps[i][j]);
+
+        return tbl;
+}
 
-        list_for_each_entry(item, head, hash) {
-                if (item->id1 == id) {
-                        found = 1;
-                        break;
+static void idmap_clear_rmt_hash(struct list_head *list)
+{
+        struct mds_idmap_entry *e;
+        int i;
+
+        for (i = 0; i < MDS_IDMAP_HASHSIZE; i++) {
+                while (!list_empty(&list[i])) {
+                        e = list_entry(list[i].next, struct mds_idmap_entry,
+                                       rmt_hash);
+                        idmap_free_entry(e);
                 }
         }
-        if (!found)
-                goto nllu;
+}
+
+void mds_idmap_free(struct mds_idmap_table *tbl)
+{
+        int i;
+
+        spin_lock(&tbl->mit_lock);
+        idmap_clear_rmt_hash(tbl->mit_idmaps[MDS_RMT_UIDMAP_IDX]);
+        idmap_clear_rmt_hash(tbl->mit_idmaps[MDS_RMT_GIDMAP_IDX]);
 
-        res = item->id2;
-out:
-        spin_unlock(&med->med_idmap_lock);
-        return res;
-nllu:
-        res = is_uid_mapping ? med->med_nllu : med->med_nllg;
-        goto out;
+        /* paranoid checking */
+        for (i = 0; i < MDS_IDMAP_HASHSIZE; i++) {
+                LASSERT(list_empty(&tbl->mit_idmaps[MDS_LCL_UIDMAP_IDX][i]));
+                LASSERT(list_empty(&tbl->mit_idmaps[MDS_LCL_GIDMAP_IDX][i]));
+        }
+        spin_unlock(&tbl->mit_lock);
+
+        OBD_FREE(tbl, sizeof(*tbl));
 }
 
-void mds_reverse_map_ugid(struct ptlrpc_request *req,
-                          struct mds_body *body)
+/*********************************
+ * helpers doing mapping for MDS *
+ *********************************/
+
+/*
+ * we allow remote setuid/setgid to an "authencated" one,
+ * this policy probably change later.
+ */
+static
+int mds_req_secdesc_do_map(struct mds_export_data *med,
+                           struct mds_req_sec_desc *rsd)
 {
-        struct mds_export_data *med = &req->rq_export->u.eu_mds_data;
+        struct mds_idmap_table *idmap = med->med_idmap;
+        uid_t uid, fsuid;
+        gid_t gid, fsgid;
+
+        uid = mds_idmap_lookup_uid(idmap, 0, rsd->rsd_uid);
+        if (uid == MDS_IDMAP_NOTFOUND) {
+                CERROR("can't find map for uid %u\n", rsd->rsd_uid);
+                return -EPERM;
+        }
 
-        LASSERT(req->rq_remote);
+        if (rsd->rsd_uid == rsd->rsd_fsuid)
+                fsuid = uid;
+        else {
+                fsuid = mds_idmap_lookup_uid(idmap, 0, rsd->rsd_fsuid);
+                if (fsuid == MDS_IDMAP_NOTFOUND) {
+                        CERROR("can't find map for fsuid %u\n", rsd->rsd_fsuid);
+                        return -EPERM;
+                }
+        }
 
-        if (body->valid & OBD_MD_FLUID)
-                body->uid = mds_idmap_get(med, body->uid, 1);
+        gid = mds_idmap_lookup_gid(idmap, 0, rsd->rsd_gid);
+        if (gid == MDS_IDMAP_NOTFOUND) {
+                CERROR("can't find map for gid %u\n", rsd->rsd_gid);
+                return -EPERM;
+        }
 
-        if (body->valid & OBD_MD_FLGID)
-                body->gid = mds_idmap_get(med, body->gid, 0);
+        if (rsd->rsd_gid == rsd->rsd_fsgid)
+                fsgid = gid;
+        else {
+                fsgid = mds_idmap_lookup_gid(idmap, 0, rsd->rsd_fsgid);
+                if (fsgid == MDS_IDMAP_NOTFOUND) {
+                        CERROR("can't find map for fsgid %u\n", rsd->rsd_fsgid);
+                        return -EPERM;
+                }
+        }
+
+        rsd->rsd_uid = uid;
+        rsd->rsd_gid = gid;
+        rsd->rsd_fsuid = fsuid;
+        rsd->rsd_fsgid = fsgid;
+
+        return 0;
+}
+
+void mds_body_do_reverse_map(struct mds_export_data *med,
+                             struct mds_body *body)
+{
+        uid_t uid;
+        gid_t gid;
+
+        if (!med->med_remote)
+                return;
+
+        ENTRY;
+        if (body->valid & OBD_MD_FLUID) {
+                uid = mds_idmap_lookup_uid(med->med_idmap, 1, body->uid);
+                if (uid == MDS_IDMAP_NOTFOUND) {
+                        uid = med->med_nllu;
+                        if (body->valid & OBD_MD_FLMODE) {
+                                body->mode = (body->mode & ~S_IRWXU) |
+                                             ((body->mode & S_IRWXO) << 6);
+                        }
+                }
+                body->uid = uid;
+        }
+        if (body->valid & OBD_MD_FLGID) {
+                gid = mds_idmap_lookup_gid(med->med_idmap, 1, body->gid);
+                if (gid == MDS_IDMAP_NOTFOUND) {
+                        gid = med->med_nllg;
+                        if (body->valid & OBD_MD_FLMODE) {
+                                body->mode = (body->mode & ~S_IRWXG) |
+                                             ((body->mode & S_IRWXO) << 3);
+                        }
+                }
+                body->gid = gid;
+        }
+
+        EXIT;
 }
 
+/**********************
+ * MDS ucred handling *
+ **********************/
+
 static inline void drop_ucred_ginfo(struct lvfs_ucred *ucred)
 {
         if (ucred->luc_ginfo) {
@@ -729,7 +882,8 @@ int mds_init_ucred(struct lvfs_ucred *ucred,
         struct lustre_sec_desc *lsd;
         ptl_nid_t peernid = req->rq_peer.peer_id.nid;
         struct group_info *gnew;
-        unsigned int setuid, setgid, strong_sec;
+        unsigned int setuid, setgid, strong_sec, root_squashed;
+        __u32 lsd_perms;
         ENTRY;
 
         LASSERT(ucred);
@@ -737,162 +891,118 @@ int mds_init_ucred(struct lvfs_ucred *ucred,
         LASSERT(rsd->rsd_ngroups <= LUSTRE_MAX_GROUPS);
 
         strong_sec = (req->rq_auth_uid != -1);
-        LASSERT(!(req->rq_remote && !strong_sec));
-
-        /* sanity check & set local/remote flag */
-        if (req->rq_remote) {
-                if (med->med_local) {
-                        CWARN("exp %p: client on nid "LPX64" was local, "
-                              "set to remote\n", req->rq_export, peernid);
-                        med->med_local = 0;
-                }
-        } else {
-                if (!med->med_local) {
-                        CWARN("exp %p: client on nid "LPX64" was remote, "
-                              "set to local\n", req->rq_export, peernid);
-                        med->med_local = 1;
-                }
-        }
-
-        setuid = (rsd->rsd_fsuid != rsd->rsd_uid);
-        setgid = (rsd->rsd_fsgid != rsd->rsd_gid);
-
-        /* deny setuid/setgid for remote client */
-        if ((setuid || setgid) && !med->med_local) {
-                CWARN("deny setxid (%u/%u) from remote client "LPX64"\n",
-                      setuid, setgid, peernid);
-                RETURN(-EPERM);
-        }
+        LASSERT(!(req->rq_remote_realm && !strong_sec));
 
-        /* take care of uid/gid mapping for client in remote realm */
-        if (req->rq_remote) {
-                /* record the uid mapping here */
-                mds_idmap_set(med, req->rq_auth_uid, rsd->rsd_uid, 1);
-
-                /* now we act as the authenticated user */
-                rsd->rsd_uid = rsd->rsd_fsuid = req->rq_auth_uid;
-        } else if (strong_sec && req->rq_auth_uid != rsd->rsd_uid) {
-                /* if we use strong authentication on this request, we
-                 * expect the uid which client claimed is true.
-                 *
-                 * FIXME root's machine_credential in krb5 will be interpret
-                 * as "nobody", which is not good for mds-mds and mds-ost
-                 * connection.
-                 */
+        /* if we use strong authentication for a local client, we
+         * expect the uid which client claimed is true.
+         */
+        if (!med->med_remote && strong_sec &&
+            req->rq_auth_uid != rsd->rsd_uid) {
                 CWARN("nid "LPX64": UID %u was authenticated while client "
-                      "claimed %u, set %u by force\n",
+                      "claimed %u, enforce to be %u\n",
                       peernid, req->rq_auth_uid, rsd->rsd_uid,
                       req->rq_auth_uid);
-                rsd->rsd_uid = req->rq_auth_uid;
+                if (rsd->rsd_uid != rsd->rsd_fsuid)
+                        rsd->rsd_uid = req->rq_auth_uid;
+                else
+                        rsd->rsd_uid = rsd->rsd_fsuid = req->rq_auth_uid;
+        }
+
+        if (med->med_remote) {
+                int rc;
+
+                if (req->rq_mapped_uid == MDS_IDMAP_NOTFOUND) {
+                        CWARN("no mapping found, deny\n");
+                        RETURN(-EPERM);
+                }
+
+                rc = mds_req_secdesc_do_map(med, rsd);
+                if (rc)
+                        RETURN(rc);
         }
 
         /* now lsd come into play */
         ucred->luc_ginfo = NULL;
         ucred->luc_lsd = lsd = mds_get_lsd(rsd->rsd_uid);
 
-#if CRAY_PORTALS
-        ucred->luc_fsuid = req->rq_uid;
-#else
-        ucred->luc_fsuid = rsd->rsd_fsuid;
-#endif
-        if (lsd) {
-                if (req->rq_remote) {
-                        /* record the gid mapping here */
-                        mds_idmap_set(med, lsd->lsd_gid, rsd->rsd_gid, 0);
-                        /* now we act as the authenticated group */
-                        rsd->rsd_gid = rsd->rsd_fsgid = lsd->lsd_gid;
-                } else if (rsd->rsd_gid != lsd->lsd_gid) {
-                        /* verify gid which client declared is true */
-                        CWARN("GID: %u while client declare %u, "
-                              "set %u by force\n",
-                              lsd->lsd_gid, rsd->rsd_gid,
-                              lsd->lsd_gid);
-                        rsd->rsd_gid = lsd->lsd_gid;
-                }
+        if (!lsd) {
+                CERROR("Deny access without LSD: uid %d\n", rsd->rsd_uid);
+                RETURN(-EPERM);
+        }
 
-                if (lsd->lsd_ginfo) {
-                        ucred->luc_ginfo = lsd->lsd_ginfo;
-                        get_group_info(ucred->luc_ginfo);
-                }
+        /* find out the setuid/setgid attempt */
+        setuid = (rsd->rsd_uid != rsd->rsd_fsuid);
+        setgid = (rsd->rsd_gid != rsd->rsd_fsgid ||
+                  rsd->rsd_gid != lsd->lsd_gid);
 
-                /* check permission of setuid */
-                if (setuid) {
-                        if (!lsd->lsd_allow_setuid) {
-                                CWARN("mds blocked setuid attempt: %u -> %u\n",
-                                      rsd->rsd_uid, rsd->rsd_fsuid);
-                                RETURN(-EPERM);
-                        }
-                }
+        lsd_perms = mds_lsd_get_perms(lsd, med->med_remote, 0, peernid);
 
-                /* check permission of setgid */
-                if (setgid) {
-                        if (!lsd->lsd_allow_setgid) {
-                                CWARN("mds blocked setgid attempt: %u -> %u\n",
-                                      rsd->rsd_gid, rsd->rsd_fsgid);
-                                RETURN(-EPERM);
-                        }
-                }
-        } else {
-                /* failed to get lsd, right now we simply deny any access
-                 * if strong authentication is used,
-                 */
-                if (strong_sec) {
-                        CWARN("mds deny access without LSD\n");
-                        RETURN(-EPERM);
-                }
+        /* check permission of setuid */
+        if (setuid && !(lsd_perms & LSD_PERM_SETUID)) {
+                CWARN("mds blocked setuid attempt: %u -> %u\n",
+                      rsd->rsd_uid, rsd->rsd_fsuid);
+                RETURN(-EPERM);
+        }
 
-                /* and otherwise deny setuid/setgid attempt */
-                if (setuid || setgid) {
-                        CWARN("mds deny setuid/setgid without LSD\n");
-                        RETURN(-EPERM);
-                }
+        /* check permission of setgid */
+        if (setgid && !(lsd_perms & LSD_PERM_SETGID)) {
+                CWARN("mds blocked setgid attempt: %u -> %u\n",
+                      rsd->rsd_gid, rsd->rsd_fsgid);
+                RETURN(-EPERM);
         }
 
-        /* NOTE: we have already obtained supplementary groups,
-         * it will be retained across root_squash. will it be a
-         * security problem??
-         */
-        mds_squash_root(mds, rsd, &peernid); 
+        root_squashed = mds_squash_root(mds, rsd, &peernid); 
 
         /* remove privilege for non-root user */
         if (rsd->rsd_fsuid)
                 rsd->rsd_cap &= ~CAP_FS_MASK;
 
-        /* by now every fields in rsd have been granted */
+        /* by now every fields other than groups in rsd have been granted */
+        ucred->luc_uid = rsd->rsd_uid;
+        ucred->luc_gid = rsd->rsd_gid;
+        ucred->luc_fsuid = rsd->rsd_fsuid;
         ucred->luc_fsgid = rsd->rsd_fsgid;
         ucred->luc_cap = rsd->rsd_cap;
-        ucred->luc_uid = rsd->rsd_uid;
+
+        /* don't use any supplementary group for remote client or
+         * we squashed root */
+        if (med->med_remote || root_squashed)
+                RETURN(0);
+
+        /* install groups from LSD */
+        if (lsd->lsd_ginfo) {
+                ucred->luc_ginfo = lsd->lsd_ginfo;
+                get_group_info(ucred->luc_ginfo);
+        }
 
         /* everything is done if we don't allow setgroups */
-        if (!lsd || !lsd->lsd_allow_setgrp)
+        if (!(lsd_perms & LSD_PERM_SETGRP))
                 RETURN(0);
 
+        /* root could set any groups as he want (if allowed), normal
+         * users only could reduce his group array.
+         */
         if (ucred->luc_uid == 0) {
-                if (rsd->rsd_ngroups == 0) {
-                        drop_ucred_ginfo(ucred);
+                drop_ucred_ginfo(ucred);
+
+                if (rsd->rsd_ngroups == 0)
                         RETURN(0);
-                }
 
                 gnew = groups_alloc(rsd->rsd_ngroups);
                 if (!gnew) {
                         CERROR("out of memory\n");
-                        drop_ucred_ginfo(ucred);
                         drop_ucred_lsd(ucred);
                         RETURN(-ENOMEM);
                 }
                 groups_from_buffer(gnew, rsd->rsd_groups);
-                groups_sort(gnew); /* can't rely on client */
+                groups_sort(gnew); /* don't rely on client doing this */
 
-                drop_ucred_ginfo(ucred);
                 ucred->luc_ginfo = gnew;
         } else {
                 __u32 set = 0, cur = 0;
-                struct group_info *ginfo;
+                struct group_info *ginfo = ucred->luc_ginfo;
 
-                /* if no group info in hash, we don't
-                 * bother createing new
-                 */
-                if (!ucred->luc_ginfo)
+                if (!ginfo)
                         RETURN(0);
 
                 /* Note: freeing a group_info count on 'nblocks' instead of
@@ -907,7 +1017,6 @@ int mds_init_ucred(struct lvfs_ucred *ucred,
                         RETURN(-ENOMEM);
                 }
 
-                ginfo = ucred->luc_ginfo;
                 while (cur < rsd->rsd_ngroups) {
                         if (groups_search(ginfo, rsd->rsd_groups[cur])) {
                                 GROUP_AT(gnew, set) = rsd->rsd_groups[cur];
index 23d18d5..d988f6a 100644 (file)
@@ -75,8 +75,8 @@ int mds_md_connect(struct obd_device *obd, char *md_name)
                 GOTO(err_last, rc = -ENOTCONN);
         }
 
-        rc = obd_connect(&conn, mds->mds_md_obd,
-                         &obd->obd_uuid, OBD_OPT_MDS_CONNECTION);
+        rc = obd_connect(&conn, mds->mds_md_obd, &obd->obd_uuid, NULL,
+                         OBD_OPT_MDS_CONNECTION);
         if (rc) {
                 CERROR("MDS cannot connect to MD(LMV) %s (%d)\n",
                        md_name, rc);
@@ -1179,7 +1179,6 @@ int mds_lock_and_check_slave(int offset, struct ptlrpc_request *req,
 
         rc = mds_init_ucred(&uc, req, rsd);
         if (rc) {
-                CERROR("can't init ucred\n");
                 GOTO(cleanup, rc);
         }
 
index 9c58e41..d5070b2 100644 (file)
@@ -255,7 +255,7 @@ int mds_dt_connect(struct obd_device *obd, char * lov_name)
         CDEBUG(D_HA, "obd: %s osc: %s lov_name: %s\n",
                obd->obd_name, mds->mds_dt_obd->obd_name, lov_name);
 
-        rc = obd_connect(&conn, mds->mds_dt_obd, &obd->obd_uuid,
+        rc = obd_connect(&conn, mds->mds_dt_obd, &obd->obd_uuid, NULL,
                          mds->mds_num + FILTER_GROUP_FIRST_MDS);
         if (rc) {
                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
index dea07fe..9a4066e 100644 (file)
@@ -1,7 +1,7 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  Copyright (c) 2004 Cluster File Systems, Inc.
+ *  Copyright (c) 2004-2005 Cluster File Systems, Inc.
  *
  *   This file is part of Lustre, http://www.lustre.org.
  *
 static struct upcall_cache _lsd_cache;
 static struct list_head _lsd_hashtable[MDS_LSD_HASHSIZE];
 
+#define MDS_LSD_UPCALL_PATH             "/usr/sbin/lsd_upcall"
+#define MDS_LSD_ACQUIRE_EXPIRE          (5)
+#define MDS_LSD_ENTRY_EXPIRE            (5 * 60)
+#define MDS_LSD_ERR_ENTRY_EXPIRE        (30)
+
 struct upcall_cache *__mds_get_global_lsd_cache()
 {
         return &_lsd_cache;
@@ -97,6 +102,11 @@ static void lsd_free_entry(struct upcall_cache *cache,
         lentry = container_of(entry, struct lsd_cache_entry, base);
         if (lentry->lsd.lsd_ginfo)
                 put_group_info(lentry->lsd.lsd_ginfo);
+        if (lentry->lsd.lsd_perms) {
+                LASSERT(lentry->lsd.lsd_nperms);
+                OBD_FREE(lentry->lsd.lsd_perms, lentry->lsd.lsd_nperms *
+                                                sizeof(struct lsd_permission));
+        }
         OBD_FREE(lentry, sizeof(*lentry));
 }
 
@@ -140,6 +150,7 @@ static int lsd_parse_downcall(struct upcall_cache *cache,
         struct lsd_cache_entry *lentry;
         struct lsd_downcall_args *lsd_args;
         struct group_info *ginfo;
+        int size;
         ENTRY;
 
         LASSERT(args);
@@ -147,30 +158,52 @@ static int lsd_parse_downcall(struct upcall_cache *cache,
         lentry = container_of(entry, struct lsd_cache_entry, base);
         lsd = &lentry->lsd;
         lsd_args = (struct lsd_downcall_args *) args;
-        LASSERT(lsd_args->err == 0);
         LASSERT(lsd_args->ngroups <= NGROUPS_MAX);
 
+        if (lsd_args->err)
+                GOTO(err_ret, lsd_args->err);
+
         ginfo = groups_alloc(lsd_args->ngroups);
         if (!ginfo) {
-                CERROR("can't alloc group_info for %d groups\n",
-                        lsd_args->ngroups);
-                RETURN(-ENOMEM);
+                CERROR("failed to alloc %d groups\n", lsd_args->ngroups);
+                GOTO(err_ret, -ENOMEM);
         }
         groups_from_buffer(ginfo, lsd_args->groups);
         groups_sort(ginfo);
 
+        if (lsd_args->nperms) {
+                size = lsd_args->nperms * sizeof(struct lsd_permission);
+                OBD_ALLOC(lsd->lsd_perms, size);
+                if (!lsd->lsd_perms) {
+                        CERROR("failed to alloc %d\n", size);
+                        GOTO(err_group, -ENOMEM);
+                }
+                if (copy_from_user(lsd->lsd_perms, lsd_args->perms, size)) {
+                        CERROR("error copy from user space\n");
+                        GOTO(err_free, -EFAULT);
+                }
+        }
+
+        lsd->lsd_invalid = 0;
         lsd->lsd_uid = lsd_args->uid;
         lsd->lsd_gid = lsd_args->gid;
         lsd->lsd_ginfo = ginfo;
-        lsd->lsd_allow_setuid = lsd_args->allow_setuid;
-        lsd->lsd_allow_setgid = lsd_args->allow_setgid;
-        lsd->lsd_allow_setgrp = lsd_args->allow_setgrp;
-
-        CWARN("LSD: uid %u gid %u ngroups %u, perm (%d/%d/%d)\n",
-              lsd->lsd_uid, lsd->lsd_gid, ginfo->ngroups,
-              lsd->lsd_allow_setuid, lsd->lsd_allow_setgid,
-              lsd->lsd_allow_setgrp);
+        lsd->lsd_nperms = lsd_args->nperms;
+
+        CWARN("LSD: %d:%d, ngrps %u, nperms %u\n", lsd->lsd_uid, lsd->lsd_gid,
+              lsd->lsd_ginfo ? lsd->lsd_ginfo->ngroups : 0, lsd->lsd_nperms);
+
         RETURN(0);
+err_free:
+        OBD_FREE(lsd->lsd_perms, size);
+        lsd->lsd_perms = NULL;
+err_group:
+        put_group_info(ginfo);
+err_ret:
+        CERROR("LSD downcall error, disable this user for %lus\n",
+               cache->uc_err_entry_expire);
+        lsd->lsd_invalid = 1;
+        RETURN(1);
 }
 
 struct lustre_sec_desc * mds_get_lsd(__u32 uid)
@@ -184,6 +217,11 @@ struct lustre_sec_desc * mds_get_lsd(__u32 uid)
                 return NULL;
 
         lentry = container_of(entry, struct lsd_cache_entry, base);
+        if (lentry->lsd.lsd_invalid) {
+                upcall_cache_put_entry(&lentry->base);
+                return NULL;
+        }
+
         return &lentry->lsd;
 }
 
@@ -211,9 +249,10 @@ int mds_init_lsd_cache()
         cache->uc_name = "LSD_CACHE";
 
         /* set default value, proc tunable */
-        sprintf(cache->uc_upcall, "%s", "/usr/sbin/lsd_upcall");
-        cache->uc_entry_expire = 5 * 60;
-        cache->uc_acquire_expire = 5;
+        sprintf(cache->uc_upcall, MDS_LSD_UPCALL_PATH);
+        cache->uc_acquire_expire = MDS_LSD_ACQUIRE_EXPIRE;
+        cache->uc_entry_expire = MDS_LSD_ENTRY_EXPIRE;
+        cache->uc_err_entry_expire = MDS_LSD_ERR_ENTRY_EXPIRE;
 
         cache->hash = lsd_hash;
         cache->alloc_entry = lsd_alloc_entry;
@@ -238,3 +277,24 @@ void mds_cleanup_lsd_cache()
 {
         upcall_cache_flush_all(&_lsd_cache);
 }
+
+__u32 mds_lsd_get_perms(struct lustre_sec_desc *lsd, __u32 is_remote,
+                        ptl_netid_t netid, ptl_nid_t nid)
+{
+        struct lsd_permission *perm = lsd->lsd_perms;
+        __u32 i;
+
+        for (i = 0; i < lsd->lsd_nperms; i++) {
+                if (perm->netid != PTL_NETID_ANY && perm->netid != netid)
+                        continue;
+                if (perm->nid != PTL_NID_ANY && perm->nid != nid)
+                        continue;
+                return perm->perm;
+        }
+
+        /* default */
+        if (is_remote)
+                return 0;
+        else
+                return LSD_PERM_SETGRP;
+}
index 12eb36b..28bc8e9 100644 (file)
@@ -841,11 +841,11 @@ int mds_open(struct mds_update_record *rec, int offset,
 {
         /* XXX ALLOCATE _something_ - 464 bytes on stack here */
         struct obd_device *obd = req->rq_export->exp_obd;
+        struct mds_export_data *med = &req->rq_export->u.eu_mds_data;
         struct mds_obd *mds = mds_req2mds(req);
         struct ldlm_reply *rep = NULL;
         struct mds_body *body = NULL;
         struct dentry *dchild = NULL, *dparent = NULL;
-        struct mds_export_data *med;
         struct lustre_handle parent_lockh[2] = {{0}, {0}};
         int rc = 0, cleanup_phase = 0, acc_mode, created = 0;
         int parent_mode = LCK_PR;
@@ -894,8 +894,10 @@ int mds_open(struct mds_update_record *rec, int offset,
 
                 rc = mds_open_by_id(req, rec->ur_id2, body, rec->ur_flags,
                                     rec, rep);
-                if (rc != -ENOENT)
+                if (rc != -ENOENT) {
+                        mds_body_do_reverse_map(med, body);
                         RETURN(rc);
+                }
 
                 /* We didn't find the correct inode on disk either, so we
                  * need to re-create it via a regular replay. */
@@ -906,7 +908,6 @@ int mds_open(struct mds_update_record *rec, int offset,
 
         LASSERT(offset == 3); /* If we got here, we must be called via intent */
 
-        med = &req->rq_export->exp_mds_data;
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
                 CERROR("test case OBD_FAIL_MDS_OPEN_PACK\n");
                 RETURN(-ENOMEM);
@@ -939,9 +940,15 @@ int mds_open(struct mds_update_record *rec, int offset,
                                        rec->ur_namelen - 1, MDS_INODELOCK_UPDATE);
         if (IS_ERR(dparent)) {
                 rc = PTR_ERR(dparent);
-                if (rc != -ENOENT)
+                if (rc != -ENOENT) {
                         CERROR("parent lookup for "DLID4" failed, error %d\n",
                                OLID4(rec->ur_id1), rc);
+                } else {
+                        /* Just cannot find parent - make it look like
+                           usual negative lookup to avoid extra MDS RPC */
+                        intent_set_disposition(rep, DISP_LOOKUP_EXECD);
+                        intent_set_disposition(rep, DISP_LOOKUP_NEG);
+                }
                 GOTO(cleanup, rc);
         }
         LASSERT(dparent->d_inode != NULL);
@@ -1291,6 +1298,8 @@ cleanup_no_trans:
         if (rc == 0)
                 atomic_inc(&mds->mds_open_count);
 
+        mds_body_do_reverse_map(med, body);
+
         /*
          * If we have not taken the "open" lock, we may not return 0 here,
          * because caller expects 0 to mean "lock is taken", and it needs
@@ -1300,6 +1309,7 @@ cleanup_no_trans:
          */
         if ((cleanup_phase != 3) && !rc)
                 rc = ENOLCK;
+
         RETURN(rc);
 }
 
index f9637b7..cd49c3c 100644 (file)
@@ -403,6 +403,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
 {
         struct mds_obd *mds = mds_req2mds(req);
         struct obd_device *obd = req->rq_export->exp_obd;
+        struct mds_export_data *med = &req->rq_export->u.eu_mds_data;
         struct mds_body *body;
         struct dentry *de;
         struct inode *inode = NULL;
@@ -423,6 +424,25 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         MDS_CHECK_RESENT(req, reconstruct_reint_setattr(rec, offset, req));
         MD_COUNTER_INCREMENT(obd, setattr);
 
+        if (med->med_remote) {
+                if (rec->ur_iattr.ia_valid & ATTR_GID) {
+                        CWARN("Deny chgrp from remote client\n");
+                        GOTO(cleanup, rc = -EPERM);
+                }
+                if (rec->ur_iattr.ia_valid & ATTR_UID) {
+                        uid_t uid;
+
+                        uid = mds_idmap_lookup_uid(med->med_idmap, 0,
+                                                   rec->ur_iattr.ia_uid);
+                        if (uid == MDS_IDMAP_NOTFOUND) {
+                                CWARN("Deny chown to uid %u\n",
+                                      rec->ur_iattr.ia_uid);
+                                GOTO(cleanup, rc = -EPERM);
+                        }
+                        rec->ur_iattr.ia_uid = uid;
+                }
+        }
+
         if (rec->ur_iattr.ia_valid & ATTR_FROM_OPEN) {
                 de = mds_id2dentry(obd, rec->ur_id1, NULL);
                 if (IS_ERR(de))
@@ -522,6 +542,8 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
                 body->valid |= OBD_MD_FLATIME;
 
+        mds_body_do_reverse_map(med, body);
+
         /* The logcookie should be no use anymore, why nobody remove
          * following code block?
          */
@@ -824,12 +846,16 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                                 oa->o_easize = *((u16 *) rec->ur_eadata);
                         }
 
-                        obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME |
-                                        OBD_MD_FLMTIME | OBD_MD_FLCTIME |
-                                        OBD_MD_FLUID | OBD_MD_FLGID);
-                        
-                        oa->o_mode = dir->i_mode;
-                        
+                        obdo_from_inode(oa, dir, OBD_MD_FLATIME |
+                                        OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+
+                        /* adjust the uid/gid/mode bits */
+                        oa->o_mode = rec->ur_mode;
+                        oa->o_uid = current->fsuid;
+                        oa->o_gid = (dir->i_mode & S_ISGID) ?
+                                                dir->i_gid : current->fsgid;
+                        oa->o_valid |= OBD_MD_FLTYPE|OBD_MD_FLUID|OBD_MD_FLGID;
+
                         CDEBUG(D_OTHER, "%s: create dir on MDS %u\n",
                                obd->obd_name, i);
 
@@ -937,6 +963,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 CDEBUG(D_INODE, "error during create: %d\n", rc);
                 GOTO(cleanup, rc);
         } else if (dchild->d_inode) {
+                struct mds_export_data *med = &req->rq_export->u.eu_mds_data;
                 struct iattr iattr;
                 struct mds_body *body;
                 struct inode *inode = dchild->d_inode;
@@ -1052,6 +1079,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 
                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
                 mds_pack_inode2body(obd, body, inode, 1);
+                mds_body_do_reverse_map(med, body);
         }
 
         EXIT;
index c947925..18efa90 100644 (file)
@@ -465,7 +465,7 @@ static int llog_test_6(struct obd_device *obd, char *name)
                 RETURN(-ENOENT);
         }
 
-        rc = obd_connect(&exph, mdc_obd, &uuid, 0);
+        rc = obd_connect(&exph, mdc_obd, &uuid, NULL, 0);
         if (rc) {
                 CERROR("6: failed to connect to MDC: %s\n", mdc_obd->obd_name);
                 RETURN(rc);
index 6b579e1..cdd8845 100644 (file)
@@ -59,7 +59,8 @@ enum {
 };
 
 static int echo_connect(struct lustre_handle *conn, struct obd_device *obd,
-                        struct obd_uuid *cluuid, unsigned long connect_flags)
+                        struct obd_uuid *cluuid, struct obd_connect_data *data,
+                        unsigned long connect_flags)
 {
         return class_connect(conn, obd, cluuid);
 }
index f28232d..b52d52b 100644 (file)
@@ -1347,7 +1347,7 @@ echo_client_setup(struct obd_device *obddev, obd_count len, void *buf)
         INIT_LIST_HEAD (&ec->ec_objects);
         ec->ec_unique = 0;
 
-        rc = obd_connect(&conn, tgt, &echo_uuid, FILTER_GROUP_ECHO);
+        rc = obd_connect(&conn, tgt, &echo_uuid, NULL, FILTER_GROUP_ECHO);
         if (rc) {
                 CERROR("fail to connect to device %s\n", lcfg->lcfg_inlbuf1);
                 return (rc);
@@ -1391,6 +1391,7 @@ static int echo_client_cleanup(struct obd_device *obddev, int flags)
 static int echo_client_connect(struct lustre_handle *conn,
                                struct obd_device *src, 
                               struct obd_uuid *cluuid,
+                               struct obd_connect_data *data,
                                unsigned long flags)
 {
         struct obd_export *exp;
index 0e9bc1b..386d02c 100644 (file)
@@ -1667,7 +1667,9 @@ static int filter_connect_post(struct obd_export *exp, unsigned long connect_fla
 
 /* nearly identical to mds_connect */
 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
-                          struct obd_uuid *cluuid, unsigned long connect_flags)
+                          struct obd_uuid *cluuid,
+                          struct obd_connect_data *data,
+                          unsigned long connect_flags)
 {
         struct obd_export *exp;
         struct filter_export_data *fed;
index 880a59d..6ffede7 100644 (file)
@@ -2991,11 +2991,12 @@ static int osc_llog_finish(struct obd_device *obd,
 
 static int osc_connect(struct lustre_handle *exph,
                        struct obd_device *obd, struct obd_uuid *cluuid,
+                       struct obd_connect_data *data,
                        unsigned long connect_flags)
 {
         int rc;
         ENTRY;
-        rc = client_connect_import(exph, obd, cluuid, connect_flags);
+        rc = client_connect_import(exph, obd, cluuid, data, connect_flags);
         RETURN(rc);
 }
 
index 42d3156..1090944 100644 (file)
@@ -963,11 +963,6 @@ int ost_msg_check_version(struct lustre_msg *msg)
         case OST_CONNECT:
         case OST_DISCONNECT:
         case OBD_PING:
-                rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
-                if (rc)
-                        CERROR("bad opc %u version %08x, expecting %08x\n",
-                               msg->opc, msg->version, LUSTRE_OBD_VERSION);
-                break;
         case OST_CREATE:
         case OST_DESTROY:
         case OST_GETATTR:
@@ -984,7 +979,7 @@ int ost_msg_check_version(struct lustre_msg *msg)
                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
                 if (rc)
                         CERROR("bad opc %u version %08x, expecting %08x\n",
-                               msg->opc, msg->version, LUSTRE_OST_VERSION);
+                               msg->opc, msg->version, LUSTRE_OBD_VERSION);
                 break;
         case LDLM_ENQUEUE:
         case LDLM_CONVERT:
index cdef863..b1d21e1 100644 (file)
@@ -236,7 +236,7 @@ int ptlbd_do_connect(struct ptlbd_obd *ptlbd)
         ENTRY;
 
         memset(&conn, 0, sizeof(conn));
-        rc = obd_connect(&conn, obd, &ptlbd->bd_server_uuid, 0);
+        rc = obd_connect(&conn, obd, &ptlbd->bd_server_uuid, NULL, 0);
         if (rc < 0)
                 RETURN(rc);
         ptlbd->bd_exp = class_conn2export(&conn);
index b8bcf5a..12e63b3 100644 (file)
@@ -305,16 +305,20 @@ int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
         int rc;
         __u64 committed_before_reconnect = 0;
         struct ptlrpc_request *request;
-        int size[] = {sizeof(imp->imp_target_uuid),
+        int size[] = {0,
+                      sizeof(imp->imp_target_uuid),
                       sizeof(obd->obd_uuid),
                       sizeof(imp->imp_dlm_handle),
-                      sizeof(unsigned long),
-                      sizeof(__u32) * 2};
-        char *tmp[] = {imp->imp_target_uuid.uuid,
+                      sizeof(imp->imp_connect_flags),
+                      sizeof(imp->imp_connect_data)};
+        char *tmp[] = {NULL,
+                       imp->imp_target_uuid.uuid,
                        obd->obd_uuid.uuid,
                        (char *)&imp->imp_dlm_handle,
                        (char *)&imp->imp_connect_flags, /* XXX: make this portable! */
-                       (char*) &obd->u.cli.cl_nllu};
+                       (char*) &imp->imp_connect_data};
+        int repsize = sizeof(struct obd_connect_data);
+                        
         struct ptlrpc_connect_async_args *aa;
         unsigned long flags;
 
@@ -362,11 +366,14 @@ int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
 
         LASSERT(imp->imp_sec);
 
+        size[0] = lustre_secdesc_size();
         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION,
-                                  imp->imp_connect_op, 5, size, tmp);
+                                  imp->imp_connect_op, 6, size, tmp);
         if (!request)
                 GOTO(out, rc = -ENOMEM);
 
+        lustre_pack_secdesc(request, size[0]);
+
 #ifndef __KERNEL__
         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
 #endif
@@ -375,7 +382,7 @@ int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
         }
 
         request->rq_send_state = LUSTRE_IMP_CONNECTING;
-        request->rq_replen = lustre_msg_size(0, NULL);
+        request->rq_replen = lustre_msg_size(1, &repsize);
         request->rq_interpret_reply = ptlrpc_connect_interpret;
 
         LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
@@ -423,10 +430,22 @@ static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
                 GOTO(out, rc);
         LASSERT(imp->imp_conn_current);
         imp->imp_conn_current->oic_last_attempt = 0;
-
+/*
+        remote_flag = lustre_msg_buf(request->rq_repmsg, 0, sizeof(int));
+        LASSERT(remote_flag != NULL);
+        imp->imp_obd->u.cli.cl_remote = *remote_flag;
+*/
         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
 
         if (aa->pcaa_initial_connect) {
+                struct obd_connect_data *conn_data;
+
+                conn_data = lustre_swab_repbuf(request, 0, sizeof(*conn_data),
+                                               lustre_swab_connect);
+                LASSERT(conn_data);
+                imp->imp_connect_data.ocd_connect_flags =
+                                        conn_data->ocd_connect_flags;
+
                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
                         CDEBUG(D_HA, "connected to replayable target: %s\n",
                                imp->imp_target_uuid.uuid);
index 8a7179b..af48920 100644 (file)
@@ -604,7 +604,7 @@ int ptlrpc_do_rawrpc(struct obd_import *imp,
         LASSERT(imp);
         class_import_get(imp);
         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
-                CWARN("raw rpc on closed imp(=>%s)? send anyway\n",
+                CDEBUG(D_SEC, "raw rpc on closed imp(=>%s)? send anyway\n",
                        imp->imp_target_uuid.uuid);
         }
 
index db9a38d..5d4fbb9 100644 (file)
@@ -74,6 +74,64 @@ void lustre_init_msg (struct lustre_msg *msg, int count, int *lens, char **bufs)
         }
 }
 
+int lustre_secdesc_size(void)
+{
+#ifdef __KERNEL__
+        int ngroups = current_ngroups;
+
+        if (ngroups > LUSTRE_MAX_GROUPS)
+                ngroups = LUSTRE_MAX_GROUPS;
+
+        return sizeof(struct mds_req_sec_desc) +
+                sizeof(__u32) * ngroups;
+#else
+        return 0;
+#endif
+}
+
+/*
+ * because group info might have changed since last time we call
+ * secdesc_size(), so here we did more sanity check to prevent garbage gids
+ */
+void lustre_pack_secdesc(struct ptlrpc_request *req, int size)
+{
+#ifdef __KERNEL__
+        struct mds_req_sec_desc *rsd;
+        
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,4)
+        struct group_info *ginfo;
+#endif
+
+        rsd = lustre_msg_buf(req->rq_reqmsg,
+                             MDS_REQ_SECDESC_OFF, size);
+        
+        rsd->rsd_uid = current->uid;
+        rsd->rsd_gid = current->gid;
+        rsd->rsd_fsuid = current->fsuid;
+        rsd->rsd_fsgid = current->fsgid;
+        rsd->rsd_cap = current->cap_effective;
+        rsd->rsd_ngroups = (size - sizeof(*rsd)) / sizeof(__u32);
+        LASSERT(rsd->rsd_ngroups <= LUSTRE_MAX_GROUPS);
+
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,4)
+        task_lock(current);
+        get_group_info(current->group_info);
+        ginfo = current->group_info;
+        task_unlock(current);
+        if (rsd->rsd_ngroups > ginfo->ngroups)
+                rsd->rsd_ngroups = ginfo->ngroups;
+        memcpy(rsd->rsd_groups, ginfo->blocks[0],
+               rsd->rsd_ngroups * sizeof(__u32));
+#else
+        LASSERT(rsd->rsd_ngroups <= NGROUPS);
+        if (rsd->rsd_ngroups > current->ngroups)
+                rsd->rsd_ngroups = current->ngroups;
+        memcpy(rsd->rsd_groups, current->groups,
+               rsd->rsd_ngroups * sizeof(__u32));
+#endif
+#endif
+}
+
 int lustre_pack_request (struct ptlrpc_request *req,
                          int count, int *lens, char **bufs)
 {
@@ -370,6 +428,13 @@ void *lustre_swab_repbuf(struct ptlrpc_request *req, int index, int min_size,
  * lustre_idl.h implemented here.
  */
 
+void lustre_swab_connect(struct obd_connect_data *ocd)
+{
+        __swab64s (&ocd->ocd_connect_flags);
+        __swab32s (&ocd->ocd_nllu[0]);
+        __swab32s (&ocd->ocd_nllu[1]);
+}
+
 void lustre_swab_obdo (struct obdo  *o)
 {
         __swab64s (&o->o_id);
index c42c47c..2e5ba9d 100644 (file)
@@ -149,6 +149,8 @@ EXPORT_SYMBOL(ptlrpc_daemonize);
 /* pack_generic.c */
 EXPORT_SYMBOL(lustre_msg_swabbed);
 EXPORT_SYMBOL(lustre_msg_check_version);
+EXPORT_SYMBOL(lustre_secdesc_size);
+EXPORT_SYMBOL(lustre_pack_secdesc);
 EXPORT_SYMBOL(lustre_pack_request);
 EXPORT_SYMBOL(lustre_pack_reply);
 EXPORT_SYMBOL(lustre_free_reply_state);
index edf9f5f..234a063 100644 (file)
@@ -459,6 +459,9 @@ ptlrpc_server_handle_request (struct ptlrpc_service *svc)
 
         /* go through security check/transform */
         request->rq_auth_uid = -1;
+        request->rq_mapped_uid = -1;
+        request->rq_remote_realm = 0;
+
         secrc = svcsec_accept(request, &sec_err);
         switch(secrc) {
         case SVC_OK:
@@ -467,11 +470,11 @@ ptlrpc_server_handle_request (struct ptlrpc_service *svc)
         case SVC_COMPLETE:
                 target_send_reply(request, 0, OBD_FAIL_MDS_ALL_REPLY_NET);
                 goto put_conn;
-        case SVC_DROP:
-                goto out;
         case SVC_LOGIN:
         case SVC_LOGOUT:
                 break;
+        case SVC_DROP:
+                goto out;
         default:
                 LBUG();
         }
index 94f57ef..4fe4557 100644 (file)
@@ -15,7 +15,6 @@
  * Bruce Fields <bfields@umich.edu>
  * Copyright (c) 2000 The Regents of the University of Michigan
  *
- * $Id: gss_api.h,v 1.3 2005/04/04 13:12:39 yury Exp $
  */
 
 #ifndef __SEC_GSS_GSS_API_H_
index 6b46e7e..2475e8e 100644 (file)
@@ -108,6 +108,11 @@ typedef struct rawobj_buf_s {
         __u8           *buf;
 } rawobj_buf_t;
 
+/*
+ * mark of the interface between kernel and lgssd/lsvcgssd
+ */
+#define GSSD_INTERFACE_VERSION  (1)
+
 #define MAXSEQ 0x80000000 /* maximum legal sequence number, from rfc 2203 */
 
 enum rpc_gss_proc {
index 81ebfc8..d82e3ae 100644 (file)
@@ -43,7 +43,6 @@
  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  *  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  *
- * $Id: sec_gss.c,v 1.4 2005/04/13 09:49:50 yury Exp $
  */
 
 #ifndef EXPORT_SYMTAB
@@ -93,32 +92,24 @@ struct rpc_clnt;
 
 static int secinit_compose_request(struct obd_import *imp,
                                    char *buf, int bufsize,
+                                   uid_t uid, gid_t gid,
+                                   long token_size,
                                    char __user *token)
 {
         struct ptlrpcs_wire_hdr *hdr;
         struct lustre_msg       *lmsg;
-        char __user             *token_buf;
-        __u64                    token_size;
+        struct mds_req_sec_desc *secdesc;
+        __u32                    size = sizeof(*secdesc);
         __u32                    lmsg_size, *p;
-        int rc;
+        int                      rc;
 
-        lmsg_size = lustre_msg_size(0, NULL);
+        lmsg_size = lustre_msg_size(1, &size);
 
-        if (copy_from_user(&token_size, token, sizeof(token_size))) {
-                CERROR("read token error\n");
-                return -EFAULT;
-        }
         if (sizeof(*hdr) + lmsg_size + size_round(token_size) > bufsize) {
-                CERROR("token size "LPU64" too large\n", token_size);
+                CERROR("token size %ld too large\n", token_size);
                 return -EINVAL;
         }
 
-        if (copy_from_user(&token_buf, (token + sizeof(token_size)),
-                           sizeof(void*))) {
-                CERROR("read token buf pointer error\n");
-                return -EFAULT;
-        }
-
         /* security wire hdr */
         hdr = buf_to_sec_hdr(buf);
         hdr->flavor  = cpu_to_le32(PTLRPC_SEC_GSS);
@@ -126,9 +117,15 @@ static int secinit_compose_request(struct obd_import *imp,
         hdr->msg_len = cpu_to_le32(lmsg_size);
         hdr->sec_len = cpu_to_le32(7 * 4 + token_size);
 
-        /* lustre message */
+        /* lustre message & secdesc */
         lmsg = buf_to_lustre_msg(buf);
-        lustre_init_msg(lmsg, 0, NULL, NULL);
+
+        lustre_init_msg(lmsg, 1, &size, NULL);
+        secdesc = lustre_msg_buf(lmsg, 0, size);
+        secdesc->rsd_uid = secdesc->rsd_fsuid = uid;
+        secdesc->rsd_gid = secdesc->rsd_fsgid = gid;
+        secdesc->rsd_cap = secdesc->rsd_ngroups = 0;
+
         lmsg->handle   = imp->imp_remote_handle;
         lmsg->type     = PTL_RPC_MSG_REQUEST;
         lmsg->opc      = SEC_INIT;
@@ -146,10 +143,10 @@ static int secinit_compose_request(struct obd_import *imp,
         *p++ = cpu_to_le32(0);                          /* context handle */
 
         /* now the token part */
-        *p++ = (__u32)(cpu_to_le64(token_size));
+        *p++ = cpu_to_le32((__u32) token_size);
         LASSERT(((char *)p - buf) + token_size <= bufsize);
 
-        rc = copy_from_user(p, token_buf, token_size);
+        rc = copy_from_user(p, token, token_size);
         if (rc) {
                 CERROR("can't copy token\n");
                 return -EFAULT;
@@ -160,38 +157,44 @@ static int secinit_compose_request(struct obd_import *imp,
 }
 
 static int secinit_parse_reply(char *repbuf, int replen,
-                               char __user *outbuf, int outlen)
+                               char __user *outbuf, long outlen)
 {
-        __u32 *p = (__u32 *)repbuf;
-        __u32 lmsg_len, sec_len, status, major, minor, seq, obj_len, round_len;
-        __u32 effective = 0;
+        __u32                   *p = (__u32 *)repbuf;
+        struct ptlrpcs_wire_hdr *hdr = (struct ptlrpcs_wire_hdr *) repbuf;
+        __u32                    lmsg_len, sec_len, status;
+        __u32                    major, minor, seq, obj_len, round_len;
+        __u32                    effective = 0;
 
         if (replen <= (4 + 6) * 4) {
                 CERROR("reply size %d too small\n", replen);
                 return -EINVAL;
         }
 
+        hdr->flavor = le32_to_cpu(hdr->flavor);
+        hdr->sectype = le32_to_cpu(hdr->sectype);
+        hdr->msg_len = le32_to_cpu(hdr->msg_len);
+        hdr->sec_len = le32_to_cpu(hdr->sec_len);
+
         lmsg_len = le32_to_cpu(p[2]);
         sec_len = le32_to_cpu(p[3]);
 
         /* sanity checks */
-        if (p[0] != cpu_to_le32(PTLRPC_SEC_GSS) ||
-            p[1] != cpu_to_le32(PTLRPC_SEC_TYPE_NONE)) {
+        if (hdr->flavor != PTLRPC_SEC_GSS ||
+            hdr->sectype != PTLRPC_SEC_TYPE_NONE) {
                 CERROR("unexpected reply\n");
                 return -EINVAL;
         }
-        if (lmsg_len % 8 ||
-            4 * 4 + lmsg_len + sec_len > replen) {
+        if (hdr->msg_len % 8 ||
+            sizeof(*hdr) + hdr->msg_len + hdr->sec_len > replen) {
                 CERROR("unexpected reply\n");
                 return -EINVAL;
         }
-        if (sec_len > outlen) {
+        if (hdr->sec_len > outlen) {
                 CERROR("outbuf too small\n");
                 return -EINVAL;
         }
 
-        p += 4;                 /* skip hdr */
-        p += lmsg_len / 4;      /* skip lmsg */
+        p = (__u32 *) buf_to_sec_data(repbuf);
         effective = 0;
 
         status = le32_to_cpu(*p++);
@@ -200,29 +203,37 @@ static int secinit_parse_reply(char *repbuf, int replen,
         seq = le32_to_cpu(*p++);
         effective += 4 * 4;
 
-        copy_to_user(outbuf, &status, 4);
+        if (copy_to_user(outbuf, &status, 4))
+                return -EFAULT;
         outbuf += 4;
-        copy_to_user(outbuf, &major, 4);
+        if (copy_to_user(outbuf, &major, 4))
+                return -EFAULT;
         outbuf += 4;
-        copy_to_user(outbuf, &minor, 4);
+        if (copy_to_user(outbuf, &minor, 4))
+                return -EFAULT;
         outbuf += 4;
-        copy_to_user(outbuf, &seq, 4);
+        if (copy_to_user(outbuf, &seq, 4))
+                return -EFAULT;
         outbuf += 4;
 
         obj_len = le32_to_cpu(*p++);
         round_len = (obj_len + 3) & ~ 3;
-        copy_to_user(outbuf, &obj_len, 4);
+        if (copy_to_user(outbuf, &obj_len, 4))
+                return -EFAULT;
         outbuf += 4;
-        copy_to_user(outbuf, (char *)p, round_len);
+        if (copy_to_user(outbuf, (char *)p, round_len))
+                return -EFAULT;
         p += round_len / 4;
         outbuf += round_len;
         effective += 4 + round_len;
 
         obj_len = le32_to_cpu(*p++);
         round_len = (obj_len + 3) & ~ 3;
-        copy_to_user(outbuf, &obj_len, 4);
+        if (copy_to_user(outbuf, &obj_len, 4))
+                return -EFAULT;
         outbuf += 4;
-        copy_to_user(outbuf, (char *)p, round_len);
+        if (copy_to_user(outbuf, (char *)p, round_len))
+                return -EFAULT;
         p += round_len / 4;
         outbuf += round_len;
         effective += 4 + round_len;
@@ -230,45 +241,47 @@ static int secinit_parse_reply(char *repbuf, int replen,
         return effective;
 }
 
-/* input: 
- *   1. ptr to uuid
- *   2. ptr to send_token
- *   3. ptr to output buffer
- *   4. output buffer size
- * output:
- *   1. return code. 0 is success
- *   2. no meaning
- *   3. ptr output data
- *   4. output data size
- *
- * return:
- *   < 0: error
- *   = 0: success
- *
- * FIXME This interface looks strange, should be reimplemented
- */
+/* XXX move to where lgssd could see */
+struct lgssd_ioctl_param {
+        int             version;        /* in   */
+        char           *uuid;           /* in   */
+        uid_t           uid;            /* in   */
+        gid_t           gid;            /* in   */
+        long            send_token_size;/* in   */
+        char           *send_token;     /* in   */
+        long            reply_buf_size; /* in   */
+        char           *reply_buf;      /* in   */
+        long            status;         /* out  */
+        long            reply_length;   /* out  */
+};
+
 static int gss_send_secinit_rpc(__user char *buffer, unsigned long count)
 {
-        struct obd_import *imp;
-        const int reqbuf_size = 1024;
-        const int repbuf_size = 1024;
-        char *reqbuf, *repbuf;
-        struct obd_device *obd;
-        char obdname[64];
-        long inbuf[4], lsize;
-        int rc, reqlen, replen;
-
-        if (count != 4 * sizeof(long)) {
-                CERROR("count %lu\n", count);
+        struct obd_import        *imp;
+        struct lgssd_ioctl_param  param;
+        const int                 reqbuf_size = 1024;
+        const int                 repbuf_size = 1024;
+        char                     *reqbuf, *repbuf;
+        struct obd_device        *obd;
+        char                      obdname[64];
+        long                      lsize;
+        int                       rc, reqlen, replen;
+
+        if (count != sizeof(param)) {
+                CERROR("partial write\n");
                 RETURN(-EINVAL);
         }
-        if (copy_from_user(inbuf, buffer, count)) {
-                CERROR("Invalid pointer\n");
+        if (copy_from_user(&param, buffer, sizeof(param)))
                 RETURN(-EFAULT);
+
+        if (param.version != GSSD_INTERFACE_VERSION) {
+                CERROR("gssd interface version %d (expect %d)\n",
+                        param.version, GSSD_INTERFACE_VERSION);
+                RETURN(-EINVAL);
         }
 
         /* take name */
-        if (strncpy_from_user(obdname, (char *)inbuf[0],
+        if (strncpy_from_user(obdname, param.uuid,
                               sizeof(obdname)) <= 0) {
                 CERROR("Invalid obdname pointer\n");
                 RETURN(-EFAULT);
@@ -297,7 +310,9 @@ static int gss_send_secinit_rpc(__user char *buffer, unsigned long count)
 
         /* get token */
         reqlen = secinit_compose_request(imp, reqbuf, reqbuf_size,
-                                         (char *)inbuf[1]);
+                                         param.uid, param.gid,
+                                         param.send_token_size,
+                                         param.send_token);
         if (reqlen < 0)
                 GOTO(out_free, rc = reqlen);
 
@@ -307,21 +322,24 @@ static int gss_send_secinit_rpc(__user char *buffer, unsigned long count)
         if (rc)
                 GOTO(out_free, rc);
 
-        if (replen > inbuf[3]) {
+        if (replen > param.reply_buf_size) {
                 CERROR("output buffer size %ld too small, need %d\n",
-                        inbuf[3], replen);
+                        param.reply_buf_size, replen);
                 GOTO(out_free, rc = -EINVAL);
         }
 
         lsize = secinit_parse_reply(repbuf, replen,
-                                    (char *)inbuf[2], (int)inbuf[3]);
+                                    param.reply_buf, param.reply_buf_size);
         if (lsize < 0)
                 GOTO(out_free, rc = (int)lsize);
 
-        copy_to_user(buffer + 3 * sizeof(long), &lsize, sizeof(lsize));
-        lsize = 0;
-        copy_to_user((char*)buffer, &lsize, sizeof(lsize));
-        rc = 0;
+        param.status = 0;
+        param.reply_length = lsize;
+
+        if (copy_to_user(buffer, &param, sizeof(param)))
+                rc = -EFAULT;
+        else
+                rc = 0;
 out_free:
         class_import_put(imp);
         if (repbuf)
@@ -370,6 +388,14 @@ struct gss_sec {
 
 static rwlock_t gss_ctx_lock = RW_LOCK_UNLOCKED;
 
+struct gss_upcall_msg_data {
+        __u32                           gum_uid;
+        __u32                           gum_svc;
+        __u32                           gum_nal;
+        __u32                           gum_netid;
+        __u64                           gum_nid;
+};
+
 struct gss_upcall_msg {
         struct rpc_pipe_msg             gum_base;
         atomic_t                        gum_refcount;
@@ -377,10 +403,7 @@ struct gss_upcall_msg {
         struct gss_sec                 *gum_gsec;
         wait_queue_head_t               gum_waitq;
         char                            gum_obdname[64];
-        uid_t                           gum_uid;
-        __u32                           gum_ip; /* XXX IPv6? */
-        __u32                           gum_svc;
-        __u32                           gum_pad;
+        struct gss_upcall_msg_data      gum_data;
 };
 
 /**********************************************
@@ -439,30 +462,28 @@ gss_unhash_msg(struct gss_upcall_msg *gmsg)
 static
 struct gss_upcall_msg * gss_find_upcall(struct gss_sec *gsec,
                                         char *obdname,
-                                        uid_t uid, __u32 dest_ip)
+                                        struct gss_upcall_msg_data *gmd)
 {
         struct gss_upcall_msg *gmsg;
         ENTRY;
 
         list_for_each_entry(gmsg, &gsec->gs_upcalls, gum_list) {
-                if (gmsg->gum_uid != uid)
-                        continue;
-                if (gmsg->gum_ip != dest_ip)
+                if (memcmp(&gmsg->gum_data, gmd, sizeof(*gmd)))
                         continue;
                 if (strcmp(gmsg->gum_obdname, obdname))
                         continue;
                 atomic_inc(&gmsg->gum_refcount);
                 CDEBUG(D_SEC, "found gmsg at %p: obdname %s, uid %d, ref %d\n",
-                       gmsg, obdname, uid, atomic_read(&gmsg->gum_refcount));
+                       gmsg, obdname, gmd->gum_uid,
+                       atomic_read(&gmsg->gum_refcount));
                 RETURN(gmsg);
         }
         RETURN(NULL);
 }
 
 static void gss_init_upcall_msg(struct gss_upcall_msg *gmsg,
-                                struct gss_sec *gsec,
-                                char *obdname,
-                                uid_t uid, __u32 dest_ip, __u32 svc)
+                                struct gss_sec *gsec, char *obdname,
+                                struct gss_upcall_msg_data *gmd)
 {
         struct rpc_pipe_msg *rpcmsg;
         ENTRY;
@@ -473,14 +494,11 @@ static void gss_init_upcall_msg(struct gss_upcall_msg *gmsg,
         atomic_set(&gmsg->gum_refcount, 2);
         gmsg->gum_gsec = gsec;
         strncpy(gmsg->gum_obdname, obdname, sizeof(gmsg->gum_obdname));
-        gmsg->gum_uid = uid;
-        gmsg->gum_ip = dest_ip;
-        gmsg->gum_svc = svc;
+        memcpy(&gmsg->gum_data, gmd, sizeof(*gmd));
 
         rpcmsg = &gmsg->gum_base;
-        rpcmsg->data = &gmsg->gum_uid;
-        rpcmsg->len = sizeof(gmsg->gum_uid) + sizeof(gmsg->gum_ip) +
-                      sizeof(gmsg->gum_svc) + sizeof(gmsg->gum_pad);
+        rpcmsg->data = &gmsg->gum_data;
+        rpcmsg->len = sizeof(gmsg->gum_data);
         EXIT;
 }
 #endif /* __KERNEL__ */
@@ -594,8 +612,8 @@ simple_get_bytes(char **buf, __u32 *buflen, void *res, __u32 reslen)
  */
 static
 int gss_parse_init_downcall(struct gss_api_mech *gm, rawobj_t *buf,
-                            struct gss_cl_ctx **gc, struct vfs_cred *vcred,
-                            __u32 *dest_ip, int *gss_err)
+                            struct gss_cl_ctx **gc,
+                            struct gss_upcall_msg_data *gmd, int *gss_err)
 {
         char *p = buf->data;
         __u32 len = buf->len;
@@ -616,10 +634,15 @@ int gss_parse_init_downcall(struct gss_api_mech *gm, rawobj_t *buf,
         spin_lock_init(&ctx->gc_seq_lock);
         atomic_set(&ctx->gc_refcount,1);
 
-        if (simple_get_bytes(&p, &len, &vcred->vc_uid, sizeof(vcred->vc_uid)))
+        if (simple_get_bytes(&p, &len, &gmd->gum_uid, sizeof(gmd->gum_uid)))
+                GOTO(err_free_ctx, err);
+        if (simple_get_bytes(&p, &len, &gmd->gum_svc, sizeof(gmd->gum_svc)))
+                GOTO(err_free_ctx, err);
+        if (simple_get_bytes(&p, &len, &gmd->gum_nal, sizeof(gmd->gum_nal)))
                 GOTO(err_free_ctx, err);
-        vcred->vc_pag = vcred->vc_uid; /* FIXME */
-        if (simple_get_bytes(&p, &len, dest_ip, sizeof(*dest_ip)))
+        if (simple_get_bytes(&p, &len, &gmd->gum_netid, sizeof(gmd->gum_netid)))
+                GOTO(err_free_ctx, err);
+        if (simple_get_bytes(&p, &len, &gmd->gum_nid, sizeof(gmd->gum_nid)))
                 GOTO(err_free_ctx, err);
         /* FIXME: discarded timeout for now */
         if (simple_get_bytes(&p, &len, &timeout, sizeof(timeout)))
@@ -632,6 +655,10 @@ int gss_parse_init_downcall(struct gss_api_mech *gm, rawobj_t *buf,
                 /* in which case the next int is an error code: */
                 if (simple_get_bytes(&p, &len, gss_err, sizeof(*gss_err)))
                         GOTO(err_free_ctx, err);
+                if (*gss_err == 0) {
+                        CERROR("error downcall pass no gss error\n");
+                        GOTO(err_free_ctx, err);
+                }
                 GOTO(err_free_ctx, err = 0);
         }
         if (rawobj_extract_local(&tmp_buf, (__u32 **) ((void *)&p), &len))
@@ -664,17 +691,17 @@ err_free_ctx:
  * cred APIs                           *
  ***************************************/
 #ifdef __KERNEL__
+#define CRED_REFRESH_UPCALL_TIMEOUT     (20)
 static int gss_cred_refresh(struct ptlrpc_cred *cred)
 {
         struct obd_import          *import;
         struct gss_sec             *gsec;
         struct gss_upcall_msg      *gss_msg, *gss_new;
+        struct gss_upcall_msg_data  gmd;
         struct dentry              *dentry;
         char                       *obdname, *obdtype;
         wait_queue_t                wait;
         uid_t                       uid = cred->pc_uid;
-        ptl_nid_t                   peer_nid;
-        __u32                       dest_ip, svc;
         int                         res;
         ENTRY;
 
@@ -691,14 +718,16 @@ static int gss_cred_refresh(struct ptlrpc_cred *cred)
                 RETURN(-EINVAL);
         }
 
-        peer_nid = import->imp_connection->c_peer.peer_id.nid;
-        dest_ip = (__u32) (peer_nid & 0xFFFFFFFF);
+        gmd.gum_uid = uid;
+        gmd.gum_nal = import->imp_connection->c_peer.peer_ni->pni_number;
+        gmd.gum_netid = 0;
+        gmd.gum_nid = import->imp_connection->c_peer.peer_id.nid;
 
         obdtype = import->imp_obd->obd_type->typ_name;
         if (!strcmp(obdtype, "mdc"))
-                svc = 0;
+                gmd.gum_svc = 0;
         else if (!strcmp(obdtype, "osc"))
-                svc = 1;
+                gmd.gum_svc = 1;
         else {
                 CERROR("gss on %s?\n", obdtype);
                 RETURN(-EINVAL);
@@ -716,7 +745,7 @@ static int gss_cred_refresh(struct ptlrpc_cred *cred)
 
 again:
         spin_lock(&gsec->gs_lock);
-        gss_msg = gss_find_upcall(gsec, obdname, uid, dest_ip);
+        gss_msg = gss_find_upcall(gsec, obdname, &gmd);
         if (gss_msg) {
                 spin_unlock(&gsec->gs_lock);
                 GOTO(waiting, res);
@@ -731,7 +760,7 @@ again:
                 goto again;
         }
         /* so far we'v created gss_new */
-        gss_init_upcall_msg(gss_new, gsec, obdname, uid, dest_ip, svc);
+        gss_init_upcall_msg(gss_new, gsec, obdname, &gmd);
 
         if (gss_cred_is_uptodate_ctx(cred)) {
                 /* someone else had done it for us, simply cancel
@@ -763,13 +792,18 @@ waiting:
         set_current_state(TASK_INTERRUPTIBLE);
         spin_unlock(&gsec->gs_lock);
 
-        schedule();
+        res = schedule_timeout(CRED_REFRESH_UPCALL_TIMEOUT * HZ);
 
         remove_wait_queue(&gss_msg->gum_waitq, &wait);
         if (signal_pending(current)) {
-                CERROR("interrupted gss upcall %p\n", gss_msg);
+                CERROR("interrupted gss upcall: cred %p\n", cred);
                 res = -EINTR;
-        }
+        } else if (res == 0) {
+                CERROR("gss upcall timeout: cred %p\n", cred);
+                res = -ETIMEDOUT;
+        } else
+                res = 0;
+
         gss_release_msg(gss_msg);
         RETURN(res);
 }
@@ -838,6 +872,7 @@ static int gss_cred_refresh(struct ptlrpc_cred *cred)
                 goto err_out;
         }
 
+        LASSERT(ctx);
         gss_cred_set_ctx(cred, ctx);
         LASSERT(gss_cred_is_uptodate_ctx(cred));
 
@@ -1373,8 +1408,8 @@ gss_pipe_downcall(struct file *filp, const char *src, size_t mlen)
         struct vfs_cred vcred = { 0 };
         struct ptlrpc_cred *cred;
         struct gss_upcall_msg *gss_msg;
+        struct gss_upcall_msg_data gmd = { 0 };
         struct gss_cl_ctx *ctx = NULL;
-        __u32  dest_ip;
         ssize_t left;
         int err, gss_err;
         ENTRY;
@@ -1406,19 +1441,21 @@ gss_pipe_downcall(struct file *filp, const char *src, size_t mlen)
         obdname = import->imp_obd->obd_name;
         mech = gsec->gs_mech;
 
-        err = gss_parse_init_downcall(mech, &obj, &ctx, &vcred, &dest_ip,
-                                      &gss_err);
-        if (err) {
-                CERROR("parse downcall err %d\n", err);
-                GOTO(err, err);
-        }
+        err = gss_parse_init_downcall(mech, &obj, &ctx, &gmd, &gss_err);
+        if (err)
+                CERROR("parse init downcall err %d\n", err);
+
+        vcred.vc_uid = gmd.gum_uid;
+        vcred.vc_pag = vcred.vc_uid; /* FIXME */
+
         cred = ptlrpcs_cred_lookup(sec, &vcred);
         if (!cred) {
-                CWARN("didn't find cred\n");
+                CWARN("didn't find cred for uid %u\n", vcred.vc_uid);
                 GOTO(err, err);
         }
-        if (gss_err) {
-                CERROR("got gss err %d, set cred %p dead\n", gss_err, cred);
+        if (err || gss_err) {
+                CERROR("got err %d, gss err %d, set cred %p dead\n",
+                        err, gss_err, cred);
                 cred->pc_flags |= PTLRPC_CRED_DEAD;
         } else {
                 CDEBUG(D_SEC, "get initial ctx:\n");
@@ -1426,7 +1463,7 @@ gss_pipe_downcall(struct file *filp, const char *src, size_t mlen)
         }
 
         spin_lock(&gsec->gs_lock);
-        gss_msg = gss_find_upcall(gsec, obdname, vcred.vc_uid, dest_ip);
+        gss_msg = gss_find_upcall(gsec, obdname, &gmd);
         if (gss_msg) {
                 gss_unhash_msg_nolock(gss_msg);
                 spin_unlock(&gsec->gs_lock);
index 1ac060e..14ddd73 100644 (file)
@@ -105,6 +105,9 @@ static inline unsigned long hash_mem(char *buf, int length, int bits)
 
 struct rsi {
         struct cache_head       h;
+        __u32                   naltype;
+        __u32                   netid;
+        __u64                   nid;
         rawobj_t                in_handle, in_token;
         rawobj_t                out_handle, out_token;
         int                     major_status, minor_status;
@@ -148,6 +151,9 @@ static void rsi_request(struct cache_detail *cd,
 {
         struct rsi *rsii = container_of(h, struct rsi, h);
 
+        qword_addhex(bpp, blen, (char *) &rsii->naltype, sizeof(rsii->naltype));
+        qword_addhex(bpp, blen, (char *) &rsii->netid, sizeof(rsii->netid));
+        qword_addhex(bpp, blen, (char *) &rsii->nid, sizeof(rsii->nid));
         qword_addhex(bpp, blen, rsii->in_handle.data, rsii->in_handle.len);
         qword_addhex(bpp, blen, rsii->in_token.data, rsii->in_token.len);
         (*bpp)[-1] = '\n';
@@ -364,7 +370,7 @@ static struct cache_detail rsi_cache = {
 #define RSC_HASHMAX     (1<<RSC_HASHBITS)
 #define RSC_HASHMASK    (RSC_HASHMAX-1)
 
-#define GSS_SEQ_WIN     128
+#define GSS_SEQ_WIN     512
 
 struct gss_svc_seq_data {
         /* highest seq number seen so far: */
@@ -378,8 +384,9 @@ struct gss_svc_seq_data {
 struct rsc {
         struct cache_head       h;
         rawobj_t                handle;
-        __u32                   remote;
+        __u32                   remote_realm;
         struct vfs_cred         cred;
+        uid_t                   mapped_uid;
         struct gss_svc_seq_data seqdata;
         struct gss_ctx         *mechctx;
 };
@@ -493,20 +500,27 @@ static int rsc_parse(struct cache_detail *cd,
                 goto out;
 
         /* remote flag */
-        rv = get_int(&mesg, &rsci->remote);
+        rv = get_int(&mesg, &rsci->remote_realm);
         if (rv) {
                 CERROR("fail to get remote flag\n");
                 goto out;
         }
 
+        /* mapped uid */
+        rv = get_int(&mesg, &rsci->mapped_uid);
+        if (rv) {
+                CERROR("fail to get mapped uid\n");
+                goto out;
+        }
+
         /* uid, or NEGATIVE */
         rv = get_int(&mesg, &rsci->cred.vc_uid);
         if (rv == -EINVAL)
                 goto out;
-        if (rv == -ENOENT)
+        if (rv == -ENOENT) {
+                CERROR("NOENT? set rsc entry negative\n");
                 set_bit(CACHE_NEGATIVE, &rsci->h.flags);
-        else {
-                int N, i;
+        } else {
                 struct gss_api_mech *gm;
                 rawobj_t tmp_buf;
                 __u64 ctx_expiry;
@@ -515,27 +529,6 @@ static int rsc_parse(struct cache_detail *cd,
                 if (get_int(&mesg, &rsci->cred.vc_gid))
                         goto out;
 
-                /* number of additional gid's */
-                if (get_int(&mesg, &N))
-                        goto out;
-                status = -ENOMEM;
-#if 0
-                rsci->cred.vc_ginfo = groups_alloc(N);
-                if (rsci->cred.vc_ginfo == NULL)
-                        goto out;
-#endif
-
-                /* gid's */
-                status = -EINVAL;
-                for (i=0; i<N; i++) {
-                        gid_t gid;
-                        if (get_int(&mesg, &gid))
-                                goto out;
-#if 0
-                        GROUP_AT(rsci->cred.vc_ginfo, i) = gid;
-#endif
-                }
-
                 /* mech name */
                 len = qword_get(&mesg, buf, mlen);
                 if (len < 0)
@@ -687,12 +680,17 @@ gss_check_seq_num(struct gss_svc_seq_data *sd, __u32 seq_num)
                 __set_bit(seq_num % GSS_SEQ_WIN, sd->sd_win);
                 goto exit;
         } else if (seq_num + GSS_SEQ_WIN <= sd->sd_max) {
+                CERROR("seq %u too low: max %u, win %d\n",
+                        seq_num, sd->sd_max, GSS_SEQ_WIN);
                 rc = 1;
                 goto exit;
         }
 
-        if (__test_and_set_bit(seq_num % GSS_SEQ_WIN, sd->sd_win))
+        if (__test_and_set_bit(seq_num % GSS_SEQ_WIN, sd->sd_win)) {
+                CERROR("seq %u is replay: max %u, win %d\n",
+                        seq_num, sd->sd_max, GSS_SEQ_WIN);
                 rc = 1;
+        }
 exit:
         spin_unlock(&sd->sd_lock);
         return rc;
@@ -912,6 +910,10 @@ gss_svcsec_handle_init(struct ptlrpc_request *req,
                 GOTO(out_rsikey, rc = SVC_DROP);
         }
 
+        rsikey->naltype = (__u32) req->rq_peer.peer_ni->pni_number;
+        rsikey->netid = 0;
+        rsikey->nid = (__u64) req->rq_peer.peer_id.nid;
+
         rsip = gssd_upcall(rsikey, &my_chandle);
         if (!rsip) {
                 CERROR("error in gssd_upcall.\n");
@@ -974,6 +976,10 @@ gss_svcsec_handle_init(struct ptlrpc_request *req,
 
         *res = PTLRPCS_OK;
 
+        req->rq_auth_uid = rsci->cred.vc_uid;
+        req->rq_remote_realm = rsci->remote_realm;
+        req->rq_mapped_uid = rsci->mapped_uid;
+
         /* This is simplified since right now we doesn't support
          * INIT_CONTINUE yet.
          */
@@ -1046,7 +1052,8 @@ gss_svcsec_handle_data(struct ptlrpc_request *req,
         }
 
         req->rq_auth_uid = rsci->cred.vc_uid;
-        req->rq_remote = rsci->remote;
+        req->rq_remote_realm = rsci->remote_realm;
+        req->rq_mapped_uid = rsci->mapped_uid;
 
         *res = PTLRPCS_OK;
         GOTO(out, rc = SVC_OK);
index 78a6276..4a7029c 100644 (file)
@@ -103,18 +103,14 @@ struct ptlrpc_svcsec * svcsec_get(struct ptlrpc_svcsec *sec)
 {
         int rc;
 
-//        spin_lock(&svcsecs_lock);
         rc = try_module_get(sec->pss_owner);
-//        spin_unlock(&svcsecs_lock);
         LASSERT(rc);
         return sec;
 }
 
 void svcsec_put(struct ptlrpc_svcsec *sec)
 {
-//        spin_lock(&svcsecs_lock);
         module_put(sec->pss_owner);
-//        spin_unlock(&svcsecs_lock);
 }
 
 /*
index 49e9522..059fc61 100644 (file)
@@ -206,7 +206,7 @@ find_again:
                 set_current_state(TASK_INTERRUPTIBLE);
                 write_unlock(&cache->uc_hashlock);
 
-                schedule_timeout(cache->uc_acquire_expire);
+                schedule_timeout(cache->uc_acquire_expire * HZ);
 
                 write_lock(&cache->uc_hashlock);
                 remove_wait_queue(&entry->ue_waitq, &wait);
@@ -222,6 +222,8 @@ find_again:
                                entry->ue_expire);
                         put_entry(entry);
                         write_unlock(&cache->uc_hashlock);
+                        CERROR("Interrupted? Or check whether %s is in place\n",
+                               cache->uc_upcall);
                         RETURN(NULL);
                 }
                 /* fall through */
@@ -284,8 +286,7 @@ void upcall_cache_put_entry(struct upcall_cache_entry *entry)
 }
 EXPORT_SYMBOL(upcall_cache_put_entry);
 
-int upcall_cache_downcall(struct upcall_cache *cache, __u64 key,
-                          int err, void *args)
+int upcall_cache_downcall(struct upcall_cache *cache, __u64 key, void *args)
 {
         struct list_head *head;
         struct upcall_cache_entry *entry;
@@ -312,11 +313,6 @@ int upcall_cache_downcall(struct upcall_cache *cache, __u64 key,
                 RETURN(-EINVAL);
         }
 
-        if (err < 0) {
-                UC_CACHE_SET_INVALID(entry);
-                GOTO(out, rc = err);
-        }
-
         if (!UC_CACHE_IS_ACQUIRING(entry) ||
             UC_CACHE_IS_INVALID(entry) ||
             UC_CACHE_IS_EXPIRED(entry)) {
@@ -333,12 +329,17 @@ int upcall_cache_downcall(struct upcall_cache *cache, __u64 key,
         rc = cache->parse_downcall(cache, entry, args);
         write_lock(&cache->uc_hashlock);
         atomic_dec(&entry->ue_refcount);
-        if (rc) {
+
+        if (rc < 0) {
                 UC_CACHE_SET_INVALID(entry);
                 list_del_init(&entry->ue_hash);
                 GOTO(out, rc);
+        } else if (rc == 0) {
+                entry->ue_expire = get_seconds() + cache->uc_entry_expire;
+        } else {
+                entry->ue_expire = get_seconds() + cache->uc_err_entry_expire;
         }
-        entry->ue_expire = get_seconds() + cache->uc_entry_expire;
+
         UC_CACHE_SET_VALID(entry);
         CDEBUG(D_OTHER, "create ucache entry %p(key "LPU64")\n",
                entry, entry->ue_key);
index 07e9f8e..5414ec8 100755 (executable)
@@ -8,10 +8,12 @@
 export KDCHOST=${KDCHOST:-"localhost"}
 export KDCDIR=${KDCDIR:-"/usr/kerberos/sbin"}
 export KRB5DIR=${KRB5DIR:-"/usr/kerberos"}
-export LGSSD=${LGSSD:-"/sbin/lgssd"}
-export SVCGSSD=${SVCGSSD:-"/sbin/lsvcgssd"}
+export LGSSD=${LGSSD:-"/usr/sbin/lgssd"}
+export SVCGSSD=${SVCGSSD:-"/usr/sbin/lsvcgssd"}
 export PDSH=${PDSH:-"ssh"}
 
+export CHECK_KDC=${CHECKKDC:-"no"}
+
 using_krb5_sec() {
     if [ "x$1" != "xkrb5i" -a "x$1" != "xkrb5p" ]; then
         echo "n"
@@ -25,6 +27,10 @@ start_krb5_kdc() {
         return 0
     fi
 
+    if [ "x$CHECK_KDC" == "xno" ]; then
+        return 0
+    fi
+
     num=`$PDSH $KDCHOST "PATH=\$PATH:$KDCDIR; ps ax | grep krb5kdc | grep -v "grep" | wc -l"`
     if [ $num -eq 1 ]; then
         return 0
index b8cbd9b..b02980d 100755 (executable)
@@ -120,6 +120,10 @@ test_0b() {
 run_test 0b "ensure object created after recover exists. (3284)"
 
 test_0c() {
+    if [ `using_krb5_sec $SECURITY` == 'n' ] ; then
+        echo "Skip 0c in non-gss mode"
+        return 0
+    fi
     # drop gss error notification
     replay_barrier mds1
     fail_drop mds1 0x760
@@ -1285,7 +1289,7 @@ test_56() {
     rm $DIR/$tfile
     return 0
 }
-run_test 56 "let MDS_CHECK_RESENT return the original return code instead of 0
+run_test 56 "let MDS_CHECK_RESENT return the original return code instead of 0"
 
 equals_msg test complete, cleaning up
 $CLEANUP
index 20ebdc1..190e718 100644 (file)
@@ -204,7 +204,6 @@ fail_drop() {
     local failcode=$2
     facet_failover $facet
     do_facet mds "echo $failcode > /proc/sys/lustre/fail_loc"
-    cat /proc/sys/lustre/fail_loc
     df $MOUNT || error "post-failover df: $?"
     do_facet mds "echo 0 > /proc/sys/lustre/fail_loc"
 }
index 62707cc..8a28fda 100644 (file)
@@ -34,6 +34,7 @@ lfs_SOURCES = lfs.c
 llmount_SOURCES = llmount.c 
 llmount_LDADD = $(LIBREADLINE) -lptlctl
 lsd_upcall_SOURCES = lsd_upcall.c
+lsd_upcall_LDADD = $(LIBREADLINE) -lptlctl
 
 EXTRA_DIST = $(bin_scripts) $(sbin_scripts)
 
index e70629f..a053084 100755 (executable)
@@ -1855,7 +1855,7 @@ class CONFDEV(Module):
         
        if self.target.get_class() == 'mds':
            if options:
-               options = "%s,iopen_nopriv" %(options)
+               options = "%s,acl,user_xattr,iopen_nopriv" %(options)
            else:
                options = "iopen_nopriv"
            
index 1ad7759..e50df3a 100644 (file)
@@ -38,6 +38,7 @@
 
 #include "obdctl.h"
 #include <portals/ptlctl.h>
+#include <linux/lustre_idl.h>
 
 int debug;
 int verbose;
@@ -120,8 +121,9 @@ init_options(struct lustre_mount_data *lmd)
         lmd->lmd_port = 988;    /* XXX define LUSTRE_DEFAULT_PORT */
         lmd->lmd_nal = SOCKNAL;
         lmd->lmd_async = 0;
-        lmd->lmd_nllu = 99;
-        lmd->lmd_nllg = 99;
+        lmd->lmd_remote_flag = 0;
+        lmd->lmd_nllu = NOBODY_UID;
+        lmd->lmd_nllg = NOBODY_GID;
         strncpy(lmd->lmd_security, "null", sizeof(lmd->lmd_security));
         return 0;
 }
@@ -344,15 +346,21 @@ int parse_options(char * options, struct lustre_mount_data *lmd)
                                 }
                         }
                 } else {
-                        val = 1;
-                        if (!strncmp(opt, "no", 2)) {
-                                val = 0;
-                                opt += 2;
-                        }
-                        if (!strcmp(opt, "debug")) {
-                                debug = val;
+                        if (!strcmp(opt, "remote")) {
+                                lmd->lmd_remote_flag = OBD_CONNECT_REMOTE;
+                        } else if (!strcmp(opt, "local")) {
+                                lmd->lmd_remote_flag = OBD_CONNECT_LOCAL;
                         } else if (!strcmp(opt, "async")) {
                                 lmd->lmd_async = 1;
+                        } else {
+                                val = 1;
+                                if (!strncmp(opt, "no", 2)) {
+                                        val = 0;
+                                        opt += 2;
+                                }
+                                if (!strcmp(opt, "debug")) {
+                                        debug = val;
+                                }
                         }
                 }
         }
index 8b55d45..2b110d8 100644 (file)
@@ -35,6 +35,9 @@
 #include <linux/obd.h>
 #include <linux/lustre_mds.h>
 
+#include <portals/types.h>
+#include <portals/ptlctl.h>
+
 /*
  * return:
  *  0:      fail to insert (found identical)
@@ -76,7 +79,7 @@ int get_groups_local(uid_t uid, gid_t *gid, int *ngroups, gid_t **groups)
 
         pw = getpwuid(uid);
         if (!pw)
-                return -errno;
+                return -ENOENT;
 
         *gid = pw->pw_gid;
 
@@ -97,34 +100,308 @@ int get_groups_local(uid_t uid, gid_t *gid, int *ngroups, gid_t **groups)
         return 0;
 }
 
-int main (int argc, char **argv)
+#define LINEBUF_SIZE    (1024)
+static char linebuf[LINEBUF_SIZE];
+
+int readline(FILE *fp, char *buf, int bufsize)
 {
-        char   *pathname = "/proc/fs/lustre/mds/lsd_downcall";
-        int     fd, rc;
-        struct lsd_downcall_args ioc_data;
+        char *p = buf;
+        int i = 0;
+
+        if (fgets(buf, bufsize, fp) == NULL)
+                return -1;
+
+        while (*p) {
+                if (*p == '#') {
+                        *p = '\0';
+                        break;
+                }
+                if (*p == '\n') {
+                        *p = '\0';
+                        break;
+                }
+                i++;
+                p++;
+        }
+
+        return i;
+}
+
+#define IS_SPACE(c) ((c) == ' ' || (c) == '\t')
+
+void remove_space_head(char **buf)
+{
+        char *p = *buf;
+
+        while (IS_SPACE(*p))
+                p++;
+
+        *buf = p;
+}
+
+void remove_space_tail(char **buf)
+{
+        char *p = *buf;
+        char *spc = NULL;
+
+        while (*p) {
+                if (!IS_SPACE(*p)) {
+                        if (spc) spc = NULL;
+                } else
+                        if (!spc) spc = p;
+                p++;
+        }
+
+        if (spc)
+                *spc = '\0';
+}
+
+int get_next_uid_range(char **buf, uid_t *uid_range)
+{
+        char *p = *buf;
+        char *comma, *sub;
+
+        remove_space_head(&p);
+        if (strlen(p) == 0)
+                return -1;
+
+        comma = strchr(p, ',');
+        if (comma) {
+                *comma = '\0';
+                *buf = comma + 1;
+        } else
+                *buf = p + strlen(p);
+
+        sub = strchr(p, '-');
+        if (!sub) {
+                uid_range[0] = uid_range[1] = atoi(p);
+        } else {
+                *sub++ = '\0';
+                uid_range[0] = atoi(p);
+                uid_range[1] = atoi(sub);
+        }
+
+        return 0;
+}
+
+/*
+ * return 0: ok
+ */
+int remove_bracket(char **buf)
+{
+        char *p = *buf;
+        char *p2;
+
+        if (*p++ != '[')
+                return -1;
+
+        p2 = strchr(p, ']');
+        if (!p2)
+                return -1;
+
+        *p2++ = '\0';
+        while (*p2) {
+                if (*p2 != ' ' && *p2 != '\t')
+                        return -1;
+                p2++;
+        }
+
+        remove_space_tail(&p);
+        *buf = p;
+        return 0;
+}
+
+/* return 0: found a match */
+int search_uid(FILE *fp, uid_t uid)
+{
+        char *p;
+        uid_t uid_range[2];
+        int rc;
+
+        while (1) {
+                rc = readline(fp, linebuf, LINEBUF_SIZE);
+                if (rc < 0)
+                        return rc;
+                if (rc == 0)
+                        continue;
+
+                p = linebuf;
+                if (remove_bracket(&p))
+                        continue;
+
+                while (get_next_uid_range(&p, uid_range) == 0) {
+                        if (uid >= uid_range[0] && uid <= uid_range[1]) {
+                                return 0;
+                        }
+                }
+                continue;
+        }
+}
+
+static struct {
+        char   *name;
+        __u32   bit;
+} perm_types[] =  {
+        {"setuid",      LSD_PERM_SETUID},
+        {"setgid",      LSD_PERM_SETGID},
+        {"setgrp",      LSD_PERM_SETGRP},
+};
+#define N_PERM_TYPES    (3)
+
+int parse_perm(__u32 *perm, char *str)
+{
+        char *p = str;
+        char *comma;
+        int i;
+
+        *perm = 0;
+
+        while (1) {
+                p = str;
+                comma = strchr(str, ',');
+                if (comma) {
+                        *comma = '\0';
+                        str = comma + 1;
+                }
+
+                for (i = 0; i < N_PERM_TYPES; i++) {
+                        if (!strcasecmp(p, perm_types[i].name)) {
+                                *perm |= perm_types[i].bit;
+                                break;
+                        }
+                }
 
-        if (argc != 2) {
-                printf("bad parameter\n");
-                return -EINVAL;
+                if (i >= N_PERM_TYPES) {
+                        printf("unkown perm type: %s\n", p);
+                        return -1;
+                }
+
+                if (!comma)
+                        break;
         }
+        return 0;
+}
 
-        ioc_data.uid = atoi(argv[1]);
+int get_one_perm(FILE *fp, struct lsd_permission *perm)
+{
+        char nid_str[256], perm_str[256];
+        int rc;
 
-        fd = open(pathname, O_WRONLY);
-        if (fd < 0) {
-                rc = -errno;
-                printf("can't open device %s\n", pathname);
+again:
+        rc = readline(fp, linebuf, LINEBUF_SIZE);
+        if (rc < 0)
                 return rc;
+        if (rc == 0)
+                goto again;
+
+        rc = sscanf(linebuf, "%s %s", nid_str, perm_str);
+        if (rc != 2)
+                return -1;
+
+        if (ptl_parse_nid(&perm->nid, nid_str))
+                return -1;
+
+        if (parse_perm(&perm->perm, perm_str))
+                return -1;
+
+        perm->netid = 0;
+        return 0;
+}
+
+#define MAX_PERMS       (50)
+
+int get_perms(FILE *fp, uid_t uid, int *nperms, struct lsd_permission **perms)
+{
+        static struct lsd_permission _perms[MAX_PERMS];
+
+        if (search_uid(fp, uid))
+                return -1;
+
+        *nperms = 0;
+        while (*nperms < MAX_PERMS) {
+                if (get_one_perm(fp, &_perms[*nperms]))
+                        break;
+                (*nperms)++;
         }
+        *perms = _perms;
+        return 0;
+}
+
+void show_result(struct lsd_downcall_args *dc)
+{
+        int i;
+
+        printf("err: %d, uid %u, gid %d\n"
+               "ngroups: %d\n",
+               dc->err, dc->uid, dc->gid, dc->ngroups);
+        for (i = 0; i < dc->ngroups; i++)
+                printf("\t%d\n", dc->groups[i]);
 
+        printf("nperms: %d\n", dc->nperms);
+        for (i = 0; i < dc->nperms; i++)
+                printf("\t: netid %u, nid "LPX64", bits %x\n", i,
+                        dc->perms[i].nid, dc->perms[i].perm);
+}
+
+void usage(char *prog)
+{
+        printf("Usage: %s [-t] uid\n", prog);
+        exit(1);
+}
+
+int main (int argc, char **argv)
+{
+        char   *dc_name = "/proc/fs/lustre/mds/lsd_downcall";
+        int     dc_fd;
+        char   *conf_name = "/etc/lustre/lsd.conf";
+        FILE   *conf_fp;
+        struct lsd_downcall_args ioc_data;
+        extern char *optarg;
+        int     opt, testing = 0, rc;
+
+        while ((opt = getopt(argc, argv, "t")) != -1) {
+                switch (opt) {
+                case 't':
+                        testing = 1;
+                        break;
+                default:
+                        usage(argv[0]);
+                }
+        }
+
+        if (optind >= argc)
+                usage(argv[0]);
+
+        memset(&ioc_data, 0, sizeof(ioc_data));
+        ioc_data.uid = atoi(argv[optind]);
+
+        /* read user/group database */
         ioc_data.err = get_groups_local(ioc_data.uid, &ioc_data.gid,
                                         &ioc_data.ngroups, &ioc_data.groups);
+        if (ioc_data.err)
+                goto do_downcall;
+
+        /* read lsd config database */
+        conf_fp = fopen(conf_name, "r");
+        if (conf_fp) {
+                get_perms(conf_fp, ioc_data.uid, &ioc_data.nperms,
+                          &ioc_data.perms);
+                fclose(conf_fp);
+        }
 
-        /* FIXME get these from config file */
-        ioc_data.allow_setuid = 1;
-        ioc_data.allow_setgid = 1;
-        ioc_data.allow_setgrp = 1;
 
-        rc = write(fd, &ioc_data, sizeof(ioc_data));
-        return (rc != sizeof(ioc_data));
+do_downcall:
+        if (testing) {
+                show_result(&ioc_data);
+                return 0;
+        } else {
+                dc_fd = open(dc_name, O_WRONLY);
+                if (dc_fd < 0) {
+                        printf("can't open device %s\n", dc_name);
+                        return -errno;
+                }
+
+                rc = write(dc_fd, &ioc_data, sizeof(ioc_data));
+                return (rc != sizeof(ioc_data));
+        }
 }