Whamcloud - gitweb
land b_groups onto HEAD:
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
index 7c29799..3994916 100644 (file)
 
 static int mdc_cleanup(struct obd_device *obd, int flags);
 
+int mdc_get_secdesc_size(void)
+{
+        int ngroups = current_ngroups;
+
+        if (ngroups > LUSTRE_MAX_GROUPS)
+                ngroups = LUSTRE_MAX_GROUPS;
+        return sizeof(struct mds_req_sec_desc) +
+               sizeof(__u32) * ngroups;
+}
+
+/*
+ * because group info might have changed since last time
+ * we call get_secdesc_size(), so here we did more sanity check
+ * to prevent garbage gids
+ */
+void mdc_pack_secdesc(struct ptlrpc_request *req, int size)
+{
+        struct mds_req_sec_desc *rsd;
+        struct group_info *ginfo;
+
+        rsd = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_SECDESC_OFF, size);
+        rsd->rsd_uid = current->uid;
+        rsd->rsd_gid = current->gid;
+        rsd->rsd_fsuid = current->fsuid;
+        rsd->rsd_fsgid = current->fsgid;
+        rsd->rsd_cap = current->cap_effective;
+        rsd->rsd_ngroups = (size - sizeof(*rsd)) / sizeof(__u32);
+        LASSERT(rsd->rsd_ngroups <= LUSTRE_MAX_GROUPS);
+
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,4)
+        task_lock(current);
+        get_group_info(current->group_info);
+        ginfo = current->group_info;
+        task_unlock(current);
+        if (rsd->rsd_ngroups > ginfo->ngropus)
+                rsd->rsd_ngroups = ginfo->ngroups;
+        memcpy(rsd->rsd_groups, ginfo->blocks[0],
+               rsd->rsd_ngroups * sizeof(__u32));
+#else
+        LASSERT(rsd->rsd_ngroups <= NGROUPS);
+        if (rsd->rsd_ngroups > current->ngroups)
+                rsd->rsd_ngroups = current->ngroups;
+        memcpy(rsd->rsd_groups, current->groups,
+               rsd->rsd_ngroups * sizeof(__u32));
+#endif
+}
+
 extern int mds_queue_req(struct ptlrpc_request *);
 /* Helper that implements most of mdc_getstatus and signal_completed_replay. */
 /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */
@@ -51,18 +98,22 @@ static int send_getstatus(struct obd_import *imp, struct ll_fid *rootfid,
 {
         struct ptlrpc_request *req;
         struct mds_body *body;
-        int rc, size = sizeof(*body);
+        int rc, size[2] = {0, sizeof(*body)};
         ENTRY;
 
-        req = ptlrpc_prep_req(imp, MDS_GETSTATUS, 1, &size, NULL);
+        //size[0] = mdc_get_secdesc_size();
+
+        req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_GETSTATUS,
+                              2, size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+        //mdc_pack_secdesc(req, size[0]);
+
+        body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof (*body));
         req->rq_send_state = level;
-        req->rq_replen = lustre_msg_size(1, &size);
+        req->rq_replen = lustre_msg_size(1, &size[1]);
 
-        mdc_pack_req_body(req);
         req->rq_reqmsg->flags |= msg_flags;
         rc = ptlrpc_queue_wait(req);
 
@@ -101,18 +152,18 @@ int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size,
         struct mds_body *body;
         void            *eadata;
         int              rc;
-        int              size[2] = {sizeof(*body), 0};
+        int              repsize[2] = {sizeof(*body), 0};
         int              bufcount = 1;
         ENTRY;
 
         /* request message already built */
 
         if (ea_size != 0) {
-                size[bufcount++] = ea_size;
+                repsize[bufcount++] = ea_size;
                 CDEBUG(D_INODE, "reserved %u bytes for MD/symlink in packet\n",
                        ea_size);
         }
-        req->rq_replen = lustre_msg_size(bufcount, size);
+        req->rq_replen = lustre_msg_size(bufcount, repsize);
 
         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
         rc = ptlrpc_queue_wait(req);
@@ -148,23 +199,26 @@ int mdc_getattr(struct obd_export *exp, struct ll_fid *fid,
 {
         struct ptlrpc_request *req;
         struct mds_body *body;
-        int size = sizeof(*body);
+        int size[2] = {0, sizeof(*body)};
         int rc;
         ENTRY;
 
         /* XXX do we need to make another request here?  We just did a getattr
          *     to do the lookup in the first place.
          */
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_GETATTR, 1, &size,
-                              NULL);
+        size[0] = mdc_get_secdesc_size();
+
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_GETATTR, 2, size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+        mdc_pack_secdesc(req, size[0]);
+
+        body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof (*body));
         memcpy(&body->fid1, fid, sizeof(*fid));
         body->valid = valid;
         body->eadatasize = ea_size;
-        mdc_pack_req_body(req);
 
         rc = mdc_getattr_common(exp, ea_size, req);
         if (rc != 0) {
@@ -182,23 +236,26 @@ int mdc_getattr_name(struct obd_export *exp, struct ll_fid *fid,
 {
         struct ptlrpc_request *req;
         struct mds_body *body;
-        int rc, size[2] = {sizeof(*body), namelen};
+        int rc, size[3] = {0, sizeof(*body), namelen};
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_GETATTR_NAME, 2,
-                              size, NULL);
+        size[0] = mdc_get_secdesc_size();
+
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_GETATTR_NAME, 3, size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+        mdc_pack_secdesc(req, size[0]);
+
+        body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof (*body));
         memcpy(&body->fid1, fid, sizeof(*fid));
         body->valid = valid;
         body->eadatasize = ea_size;
-        mdc_pack_req_body(req);
 
         if (filename != NULL) {
                 LASSERT (strnlen (filename, namelen) == namelen - 1);
-                memcpy(lustre_msg_buf(req->rq_reqmsg, 1, namelen),
+                memcpy(lustre_msg_buf(req->rq_reqmsg, 2, namelen),
                        filename, namelen);
         } else {
                 LASSERT(namelen == 1);
@@ -216,8 +273,8 @@ int mdc_getattr_name(struct obd_export *exp, struct ll_fid *fid,
 
 /* This should be called with both the request and the reply still packed. */
 int mdc_store_inode_generation(struct obd_export *exp,
-                                struct ptlrpc_request *req, int reqoff,
-                                int repoff)
+                               struct ptlrpc_request *req,
+                               int reqoff, int repoff)
 {
         struct mds_rec_create *rec =
                 lustre_msg_buf(req->rq_reqmsg, reqoff, sizeof(*rec));
@@ -344,7 +401,8 @@ static void mdc_replay_open(struct ptlrpc_request *req)
         if (close_req != NULL) {
                 struct mds_body *close_body;
                 LASSERT(close_req->rq_reqmsg->opc == MDS_CLOSE);
-                close_body = lustre_msg_buf(close_req->rq_reqmsg, 0,
+                close_body = lustre_msg_buf(close_req->rq_reqmsg,
+                                            MDS_REQ_REC_OFF,
                                             sizeof(*close_body));
                 if (och != NULL)
                         LASSERT(!memcmp(&old, &close_body->handle, sizeof old));
@@ -361,10 +419,12 @@ int  mdc_set_open_replay_data(struct obd_export *exp,
                               struct ptlrpc_request *open_req)
 {
         struct mdc_open_data *mod;
-        struct mds_rec_create *rec =
-                lustre_msg_buf(open_req->rq_reqmsg, 2, sizeof(*rec));
-        struct mds_body *body =
-                lustre_msg_buf(open_req->rq_repmsg, 1, sizeof(*body));
+        struct mds_rec_create *rec;
+        struct mds_body *body;
+
+        rec = lustre_msg_buf(open_req->rq_reqmsg, MDS_REQ_INTENT_REC_OFF,
+                             sizeof(*rec));
+        body = lustre_msg_buf(open_req->rq_repmsg, 1, sizeof(*body));
 
         LASSERT(rec != NULL);
         /* outgoing messages always in my byte order */
@@ -486,7 +546,7 @@ int mdc_close(struct obd_export *exp, struct obdo *oa,
               struct obd_client_handle *och, struct ptlrpc_request **request)
 {
         struct obd_device *obd = class_exp2obd(exp);
-        int reqsize[2] = {sizeof(struct mds_body),
+        int reqsize[3] = {0, sizeof(struct mds_body),
                           obd->u.cli.cl_max_mds_cookiesize};
         int rc, repsize[3] = {sizeof(struct mds_body),
                               obd->u.cli.cl_max_mds_easize,
@@ -496,11 +556,15 @@ int mdc_close(struct obd_export *exp, struct obdo *oa,
         struct l_wait_info lwi;
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 2, reqsize,
-                              NULL);
+        //reqsize[0] = mdc_get_secdesc_size();
+
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_CLOSE, 3, reqsize, NULL);
         if (req == NULL)
                 GOTO(out, rc = -ENOMEM);
 
+        //mdc_pack_secdesc(req, reqsize[0]);
+
         /* Ensure that this close's handle is fixed up during replay. */
         LASSERT(och != NULL);
         mod = och->och_mod;
@@ -513,7 +577,7 @@ int mdc_close(struct obd_export *exp, struct obdo *oa,
                 CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
         }
 
-        mdc_close_pack(req, 0, oa, oa->o_valid, och);
+        mdc_close_pack(req, 1, oa, oa->o_valid, och);
 
         req->rq_replen = lustre_msg_size(3, repsize);
         req->rq_commit_cb = mdc_commit_close;
@@ -570,15 +634,19 @@ int mdc_done_writing(struct obd_export *exp, struct obdo *obdo)
 {
         struct ptlrpc_request *req;
         struct mds_body *body;
-        int rc, size = sizeof(*body);
+        int rc, size[2] = {0, sizeof(*body)};
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_DONE_WRITING, 1,
-                              &size, NULL);
+        size[0] = mdc_get_secdesc_size();
+
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_DONE_WRITING, 2, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
+        mdc_pack_secdesc(req, size[0]);
+
+        body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof(*body));
         mdc_pack_fid(&body->fid1, obdo->o_id, 0, obdo->o_mode);
         body->size = obdo->o_size;
         body->blocks = obdo->o_blocks;
@@ -586,7 +654,7 @@ int mdc_done_writing(struct obd_export *exp, struct obdo *obdo)
         body->valid = obdo->o_valid;
 //        memcpy(&body->handle, &och->och_fh, sizeof(body->handle));
 
-        req->rq_replen = lustre_msg_size(1, &size);
+        req->rq_replen = lustre_msg_size(1, &size[1]);
 
         rc = ptlrpc_queue_wait(req);
         ptlrpc_req_finished(req);
@@ -601,17 +669,22 @@ int mdc_readpage(struct obd_export *exp, struct ll_fid *mdc_fid,
         struct ptlrpc_request *req = NULL;
         struct ptlrpc_bulk_desc *desc = NULL;
         struct mds_body *body;
-        int rc, size = sizeof(*body);
+        int rc, size[2] = {0, sizeof(*body)};
         ENTRY;
 
         CDEBUG(D_INODE, "inode: %ld\n", (long)mdc_fid->id);
 
-        req = ptlrpc_prep_req(imp, MDS_READPAGE, 1, &size, NULL);
+        size[0] = mdc_get_secdesc_size();
+
+        req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_READPAGE,
+                              2, size, NULL);
         if (req == NULL)
                 GOTO(out, rc = -ENOMEM);
         /* XXX FIXME bug 249 */
         req->rq_request_portal = MDS_READPAGE_PORTAL;
 
+        mdc_pack_secdesc(req, size[0]);
+
         desc = ptlrpc_prep_bulk_imp(req, 1, BULK_PUT_SINK, MDS_BULK_PORTAL);
         if (desc == NULL)
                 GOTO(out, rc = -ENOMEM);
@@ -619,9 +692,9 @@ int mdc_readpage(struct obd_export *exp, struct ll_fid *mdc_fid,
 
         ptlrpc_prep_bulk_page(desc, page, 0, PAGE_CACHE_SIZE);
 
-        mdc_readdir_pack(req, offset, PAGE_CACHE_SIZE, mdc_fid);
+        mdc_readdir_pack(req, 1, offset, PAGE_CACHE_SIZE, mdc_fid);
 
-        req->rq_replen = lustre_msg_size(1, &size);
+        req->rq_replen = lustre_msg_size(1, &size[1]);
         rc = ptlrpc_queue_wait(req);
 
         if (rc == 0) {
@@ -723,8 +796,8 @@ int mdc_set_info(struct obd_export *exp, obd_count keylen,
                 int rc, size[2] = {keylen, vallen};
                 char *bufs[2] = {key, val};
 
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SET_INFO, 2,
-                                      size, bufs);
+                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
+                                      OST_SET_INFO, 2, size, bufs);
                 if (req == NULL)
                         RETURN(-ENOMEM);
 
@@ -755,7 +828,8 @@ static int mdc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
          * during mount that would help a bit).  Having relative timestamps
          * is not so great if request processing is slow, while absolute
          * timestamps are not ideal because they need time synchronization. */
-        req = ptlrpc_prep_req(obd->u.cli.cl_import, MDS_STATFS, 0, NULL, NULL);
+        req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_MDS_VERSION,
+                              MDS_STATFS, 0, NULL, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
@@ -787,18 +861,23 @@ static int mdc_pin(struct obd_export *exp, obd_id ino, __u32 gen, int type,
 {
         struct ptlrpc_request *req;
         struct mds_body *body;
-        int rc, size = sizeof(*body);
+        int rc, size[2] = {0, sizeof(*body)};
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_PIN, 1, &size, NULL);
+        //size[0] = mdc_get_secdesc_size();
+
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_PIN, 2, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+        //mdc_pack_secdesc(req, size[0]);
+
+        body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof(*body));
         mdc_pack_fid(&body->fid1, ino, gen, type);
         body->flags = flag;
 
-        req->rq_replen = lustre_msg_size(1, &size);
+        req->rq_replen = lustre_msg_size(1, &size[1]);
 
         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
         rc = ptlrpc_queue_wait(req);
@@ -833,17 +912,22 @@ static int mdc_unpin(struct obd_export *exp,
 {
         struct ptlrpc_request *req;
         struct mds_body *body;
-        int rc, size = sizeof(*body);
+        int rc, size[2] = {0, sizeof(*body)};
         ENTRY;
 
         if (handle->och_magic != OBD_CLIENT_HANDLE_MAGIC)
                 RETURN(0);
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 1, &size, NULL);
+        //size[0] = mdc_get_secdesc_size();
+
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_CLOSE, 2, size, NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
+        //mdc_pack_secdesc(req, size[0]);
+
+        body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof(*body));
         memcpy(&body->handle, &handle->och_fh, sizeof(body->handle));
         body->flags = flag;
 
@@ -866,21 +950,26 @@ int mdc_sync(struct obd_export *exp, struct ll_fid *fid,
 {
         struct ptlrpc_request *req;
         struct mds_body *body;
-        int size = sizeof(*body);
+        int size[2] = {0, sizeof(*body)};
         int rc;
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_SYNC, 1,&size,NULL);
+        //size[0] = mdc_get_secdesc_size();
+
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              MDS_SYNC, 2, size, NULL);
         if (!req)
                 RETURN(rc = -ENOMEM);
 
+        //mdc_pack_secdesc(req, size[0]);
+
         if (fid) {
-                body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+                body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF,
+                                      sizeof (*body));
                 memcpy(&body->fid1, fid, sizeof(*fid));
-                mdc_pack_req_body(req);
         }
 
-        req->rq_replen = lustre_msg_size(1, &size);
+        req->rq_replen = lustre_msg_size(1, &size[1]);
 
         rc = ptlrpc_queue_wait(req);
         if (rc || request == NULL)
@@ -1060,8 +1149,9 @@ static int mdc_get_info(struct obd_export *exp, obd_count keylen,
                 __u32 *reply;
                 char *bufs[1] = {key};
                 int rc;
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), OST_GET_INFO, 1,
-                                      &keylen, bufs);
+                req = ptlrpc_prep_req(class_exp2cliimp(exp),
+                                      LUSTRE_OBD_VERSION, OST_GET_INFO,
+                                      1, &keylen, bufs);
                 if (req == NULL)
                         RETURN(-ENOMEM);
 
@@ -1094,8 +1184,8 @@ int mdc_obj_create(struct obd_export *exp, struct obdo *oa,
 
         LASSERT(oa);
 
-        request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_CREATE,
-                                  1, &size, NULL);
+        request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
+                                  OST_CREATE, 1, &size, NULL);
         if (!request)
                 GOTO(out_req, rc = -ENOMEM);
 
@@ -1146,7 +1236,8 @@ int mdc_brw(int rw, struct obd_export *exp, struct obdo *oa,
         size[1] = sizeof(*ioobj);
         size[2] = oa_bufs * sizeof(*niobuf);
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), opc, 3, size, NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, opc,
+                              3, size, NULL);
         LASSERT(req != NULL);
 
         if (opc == OST_WRITE)