Whamcloud - gitweb
branch: b_new_cmd
authorericm <ericm>
Thu, 12 Oct 2006 17:59:27 +0000 (17:59 +0000)
committerericm <ericm>
Thu, 12 Oct 2006 17:59:27 +0000 (17:59 +0000)
- first part of b11012: mdc_realloc_openmsg() should not manipulate
  request buffer directly, should be done in sptlrpc layer. implement
  enlarge_msg() in null and plain policy.
- restore default to plain for rpcs to MDTs

lustre/include/liblustre.h
lustre/include/lustre_sec.h
lustre/mdc/mdc_locks.c
lustre/ptlrpc/sec.c
lustre/ptlrpc/sec_null.c
lustre/ptlrpc/sec_plain.c

index 17dbceb..290accd 100644 (file)
@@ -760,6 +760,36 @@ int test_and_clear_bit(int nr, unsigned long *addr)
         return oldbit;
 }
 
+static inline
+int fls(int x)
+{
+        int r = 32;
+
+        if (!x)
+                return 0;
+        if (!(x & 0xffff0000u)) {
+                x <<= 16;
+                r -= 16;
+        }
+        if (!(x & 0xff000000u)) {
+                x <<= 8;
+                r -= 8;
+        }
+        if (!(x & 0xf0000000u)) {
+                x <<= 4;
+                r -= 4;
+        }
+        if (!(x & 0xc0000000u)) {
+                x <<= 2;
+                r -= 2;
+        }
+        if (!(x & 0x80000000u)) {
+                x <<= 1;
+                r -= 1;
+        }
+        return r;
+}
+
 /* FIXME sys/capability will finally included linux/fs.h thus
  * cause numerous trouble on x86-64. as temporary solution for
  * build broken at Cray, we copy definition we need from capability.h
index 3246b24..ced9bea 100644 (file)
@@ -305,8 +305,7 @@ struct ptlrpc_sec_cops {
         int                     (*enlarge_reqbuf)
                                                (struct ptlrpc_sec *sec,
                                                 struct ptlrpc_request *req,
-                                                int segment, int newsize,
-                                                int move_data);
+                                                int segment, int newsize);
 };
 
 struct ptlrpc_sec_sops {
@@ -412,6 +411,28 @@ struct proc_dir_entry;
 extern struct proc_dir_entry *sptlrpc_proc_root;
 
 /*
+ * round size up to next power of 2, for slab allocation.
+ * @size must be sane (can't overflow after round up)
+ */
+static inline int size_roundup_power2(int size)
+{
+        int rc;
+
+        LASSERT(size > 0);
+        rc = 1 << (fls(size) - 1);
+        if ((rc - 1) & size)
+                rc <<= 1;
+        LASSERT(rc > 0);
+        return rc;
+}
+
+/*
+ * internal support libraries
+ */
+void _sptlrpc_enlarge_msg_inplace(struct lustre_msg *msg,
+                                  int segment, int newsize);
+
+/*
  * security type
  */
 int sptlrpc_register_policy(struct ptlrpc_sec_policy *policy);
@@ -453,7 +474,7 @@ void sptlrpc_cli_free_reqbuf(struct ptlrpc_request *req);
 int sptlrpc_cli_alloc_repbuf(struct ptlrpc_request *req, int msgsize);
 void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req);
 int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
-                               int segment, int newsize, int movedata);
+                               int segment, int newsize);
 void sptlrpc_request_out_callback(struct ptlrpc_request *req);
 
 /*
index b12266d..9914f53 100644 (file)
@@ -211,16 +211,6 @@ static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
         }
 }
 
-static int round_up(int val)
-{
-        int ret = 1;
-        while (val) {
-                val >>= 1;
-                ret <<= 1;
-        }
-        return ret;
-}
-
 /* Save a large LOV EA into the request buffer so that it is available
  * for replay.  We don't do this in the initial request because the
  * original request doesn't need this buffer (at most it sends just the
@@ -238,7 +228,7 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req,
         int     rc;
 
         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
-                                        body->eadatasize, 0);
+                                        body->eadatasize);
         if (rc) {
                 CERROR("Can't enlarge segment %d size to %d\n",
                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
@@ -312,11 +302,6 @@ int mdc_enqueue(struct obd_export *exp,
                 if (do_join)
                         size[DLM_INTENT_REC_OFF + 5] =
                                                 sizeof(struct mdt_rec_join);
-                rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
-                                     8 + do_join, size);
-                if (rc & (rc - 1))
-                        size[ea_off] = min(size[ea_off] + round_up(rc) - rc,
-                                           obddev->u.cli.cl_max_mds_easize);
 
                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
                                       LDLM_ENQUEUE, 8 + do_join, size, NULL);
index 1f1d615..94765d3 100644 (file)
@@ -1322,11 +1322,60 @@ void sptlrpc_cli_free_reqbuf(struct ptlrpc_request *req)
         policy->sp_cops->free_reqbuf(ctx->cc_sec, req);
 }
 
+/*
+ * NOTE caller must guarantee the buffer size is enough for the enlargement
+ */
+void _sptlrpc_enlarge_msg_inplace(struct lustre_msg *msg,
+                                  int segment, int newsize)
+{
+        void   *src, *dst;
+        int     oldsize, oldmsg_size, movesize;
+
+        LASSERT(segment < msg->lm_bufcount);
+        LASSERT(msg->lm_buflens[segment] < newsize);
+
+        /* nothing to do if we are enlarging the last segment */
+        if (segment == msg->lm_bufcount - 1) {
+                msg->lm_buflens[segment] = newsize;
+                return;
+        }
+
+        oldsize = msg->lm_buflens[segment];
+
+        src = lustre_msg_buf(msg, segment + 1, 0);
+        msg->lm_buflens[segment] = newsize;
+        dst = lustre_msg_buf(msg, segment + 1, 0);
+        msg->lm_buflens[segment] = oldsize;
+
+        /* move from segment + 1 to end segment */
+        LASSERT(msg->lm_magic == LUSTRE_MSG_MAGIC_V2);
+        oldmsg_size = lustre_msg_size_v2(msg->lm_bufcount, msg->lm_buflens);
+        movesize = oldmsg_size - ((unsigned long) src - (unsigned long) msg);
+        LASSERT(movesize >= 0);
+
+        if (movesize)
+                memmove(dst, src, movesize);
+
+        /* note we don't clear the ares where old data live, not secret */
+
+        /* finally set new segment size */
+        msg->lm_buflens[segment] = newsize;
+}
+EXPORT_SYMBOL(_sptlrpc_enlarge_msg_inplace);
+
+/*
+ * enlarge @segment of upper message req->rq_reqmsg to @newsize, all data
+ * will be preserved after enlargement. this must be called after rq_reqmsg has
+ * been intialized at least.
+ *
+ * caller's attention: upon return, rq_reqmsg and rq_reqlen might have
+ * been changed.
+ */
 int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
-                               int segment, int newsize, int movedata)
+                               int segment, int newsize)
 {
         struct ptlrpc_cli_ctx    *ctx = req->rq_cli_ctx;
-        struct ptlrpc_sec_policy *policy;
+        struct ptlrpc_sec_cops   *cops;
         struct lustre_msg        *msg = req->rq_reqmsg;
 
         LASSERT(ctx);
@@ -1337,10 +1386,9 @@ int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
         if (msg->lm_buflens[segment] == newsize)
                 return 0;
 
-        policy = ctx->cc_sec->ps_policy;
-        LASSERT(policy->sp_cops->enlarge_reqbuf);
-        return policy->sp_cops->enlarge_reqbuf(ctx->cc_sec, req,
-                                               segment, newsize, movedata);
+        cops = ctx->cc_sec->ps_policy->sp_cops;
+        LASSERT(cops->enlarge_reqbuf);
+        return cops->enlarge_reqbuf(ctx->cc_sec, req, segment, newsize);
 }
 EXPORT_SYMBOL(sptlrpc_cli_enlarge_reqbuf);
 
@@ -1853,7 +1901,7 @@ int get_default_flavor(enum lustre_part to_part, struct sec_flavor_config *conf)
 
         switch (to_part) {
         case LUSTRE_MDT:
-                conf->sfc_rpc_flavor = SPTLRPC_FLVR_NULL;//XXX SPTLRPC_FLVR_PLAIN;
+                conf->sfc_rpc_flavor = SPTLRPC_FLVR_PLAIN;
                 return 0;
         case LUSTRE_OST:
                 conf->sfc_rpc_flavor = SPTLRPC_FLVR_NULL;
index 38bca6f..2b06e00 100644 (file)
@@ -104,12 +104,14 @@ int null_alloc_reqbuf(struct ptlrpc_sec *sec,
                       int msgsize)
 {
         if (!req->rq_reqbuf) {
+                int alloc_size = size_roundup_power2(msgsize);
+
                 LASSERT(!req->rq_pool);
-                OBD_ALLOC(req->rq_reqbuf, msgsize);
+                OBD_ALLOC(req->rq_reqbuf, alloc_size);
                 if (!req->rq_reqbuf)
                         return -ENOMEM;
 
-                req->rq_reqbuf_len = msgsize;
+                req->rq_reqbuf_len = alloc_size;
         } else {
                 LASSERT(req->rq_pool);
                 LASSERT(req->rq_reqbuf_len >= msgsize);
@@ -126,8 +128,11 @@ void null_free_reqbuf(struct ptlrpc_sec *sec,
 {
         if (!req->rq_pool) {
                 LASSERTF(req->rq_reqmsg == req->rq_reqbuf,
-                         "reqmsg %p is not reqbuf %p in NULL sec\n",
+                         "reqmsg %p is not reqbuf %p in null sec\n",
                          req->rq_reqmsg, req->rq_reqbuf);
+                LASSERTF(req->rq_reqbuf_len >= req->rq_reqlen,
+                         "reqlen %d should smaller than buflen %d\n",
+                         req->rq_reqlen, req->rq_reqbuf_len);
 
                 OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len);
                 req->rq_reqmsg = req->rq_reqbuf = NULL;
@@ -140,6 +145,8 @@ int null_alloc_repbuf(struct ptlrpc_sec *sec,
                       struct ptlrpc_request *req,
                       int msgsize)
 {
+        msgsize = size_roundup_power2(msgsize);
+
         OBD_ALLOC(req->rq_repbuf, msgsize);
         if (!req->rq_repbuf)
                 return -ENOMEM;
@@ -160,37 +167,46 @@ void null_free_repbuf(struct ptlrpc_sec *sec,
 static
 int null_enlarge_reqbuf(struct ptlrpc_sec *sec,
                         struct ptlrpc_request *req,
-                        int segment, int newsize, int move_data)
+                        int segment, int newsize)
 {
-        struct lustre_msg      *oldmsg = req->rq_reqbuf, *newmsg;
-        int                     oldsize, new_msgsize;
+        struct lustre_msg      *newbuf;
+        int                     oldsize, newmsg_size, alloc_size;
 
         LASSERT(req->rq_reqbuf);
         LASSERT(req->rq_reqbuf == req->rq_reqmsg);
-        LASSERT(!move_data); // XXX
-
-        oldsize = oldmsg->lm_buflens[segment];
-        oldmsg->lm_buflens[segment] = newsize;
-
-        new_msgsize = lustre_msg_size(oldmsg->lm_magic,
-                                      oldmsg->lm_bufcount, oldmsg->lm_buflens);
-
-        /* FIXME need move data!!! */
-        if (req->rq_pool) {
-                req->rq_reqlen = new_msgsize;
-        } else {
-                OBD_ALLOC(newmsg, new_msgsize);
-                if (newmsg == NULL) {
-                        oldmsg->lm_buflens[segment] = oldsize;
+        LASSERT(req->rq_reqbuf_len >= req->rq_reqlen);
+        LASSERT(req->rq_reqlen == lustre_msg_size(req->rq_reqmsg->lm_magic,
+                                                  req->rq_reqmsg->lm_bufcount,
+                                                  req->rq_reqmsg->lm_buflens));
+
+        /* compute new message size */
+        oldsize = req->rq_reqbuf->lm_buflens[segment];
+        req->rq_reqbuf->lm_buflens[segment] = newsize;
+        newmsg_size = lustre_msg_size(req->rq_reqbuf->lm_magic,
+                                      req->rq_reqbuf->lm_bufcount,
+                                      req->rq_reqbuf->lm_buflens);
+        req->rq_reqbuf->lm_buflens[segment] = oldsize;
+
+        /* request from pool should always have enough buffer */
+        LASSERT(!req->rq_pool || req->rq_reqbuf_len >= newmsg_size);
+
+        if (req->rq_reqbuf_len < newmsg_size) {
+                alloc_size = size_roundup_power2(newmsg_size);
+
+                OBD_ALLOC(newbuf, alloc_size);
+                if (newbuf == NULL)
                         return -ENOMEM;
-                }
-                memcpy(newmsg, oldmsg, req->rq_reqlen);
+
+                memcpy(newbuf, req->rq_reqbuf, req->rq_reqlen);
 
                 OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len);
-                req->rq_reqbuf = req->rq_reqmsg = newmsg;
-                req->rq_reqbuf_len = req->rq_reqlen = new_msgsize;
+                req->rq_reqbuf = req->rq_reqmsg = newbuf;
+                req->rq_reqbuf_len = alloc_size;
         }
 
+        _sptlrpc_enlarge_msg_inplace(req->rq_reqmsg, segment, newsize);
+        req->rq_reqlen = newmsg_size;
+
         return 0;
 }
 
index 53abfba..794600d 100644 (file)
@@ -181,9 +181,10 @@ int plain_alloc_reqbuf(struct ptlrpc_sec *sec,
 
         alloc_len = lustre_msg_size_v2(bufcnt, buflens);
 
-
         if (!req->rq_reqbuf) {
                 LASSERT(!req->rq_pool);
+
+                alloc_len = size_roundup_power2(alloc_len);
                 OBD_ALLOC(req->rq_reqbuf, alloc_len);
                 if (!req->rq_reqbuf)
                         RETURN(-ENOMEM);
@@ -237,6 +238,7 @@ int plain_alloc_repbuf(struct ptlrpc_sec *sec,
         }
 
         alloc_len = lustre_msg_size_v2(bufcnt, buflens);
+        alloc_len = size_roundup_power2(alloc_len);
 
         OBD_ALLOC(req->rq_repbuf, alloc_len);
         if (!req->rq_repbuf)
@@ -260,10 +262,55 @@ void plain_free_repbuf(struct ptlrpc_sec *sec,
 static
 int plain_enlarge_reqbuf(struct ptlrpc_sec *sec,
                          struct ptlrpc_request *req,
-                         int segment, int newsize, int move_data)
+                         int segment, int newsize)
 {
-        LBUG();
-        return 0;
+        struct lustre_msg      *newbuf;
+        int                     oldsize;
+        int                     newmsg_size, newbuf_size;
+        ENTRY;
+
+        /* embedded msg always at seg 0 */
+        LASSERT(req->rq_reqbuf);
+        LASSERT(req->rq_reqbuf_len >= req->rq_reqlen);
+        LASSERT(lustre_msg_buf(req->rq_reqbuf, 0, 0) == req->rq_reqmsg);
+
+        /* compute new embedded msg size.  */
+        oldsize = req->rq_reqmsg->lm_buflens[segment];
+        req->rq_reqmsg->lm_buflens[segment] = newsize;
+        newmsg_size = lustre_msg_size_v2(req->rq_reqmsg->lm_bufcount,
+                                         req->rq_reqmsg->lm_buflens);
+        req->rq_reqmsg->lm_buflens[segment] = oldsize;
+
+        /* compute new wrapper msg size.  */
+        oldsize = req->rq_reqbuf->lm_buflens[0];
+        req->rq_reqbuf->lm_buflens[0] = newmsg_size;
+        newbuf_size = lustre_msg_size_v2(req->rq_reqbuf->lm_bufcount,
+                                         req->rq_reqbuf->lm_buflens);
+        req->rq_reqbuf->lm_buflens[0] = oldsize;
+
+        /* request from pool should always have enough buffer */
+        LASSERT(!req->rq_pool || req->rq_reqbuf_len >= newbuf_size);
+
+        if (req->rq_reqbuf_len < newbuf_size) {
+                newbuf_size = size_roundup_power2(newbuf_size);
+
+                OBD_ALLOC(newbuf, newbuf_size);
+                if (newbuf == NULL)
+                        RETURN(-ENOMEM);
+
+                memcpy(newbuf, req->rq_reqbuf, req->rq_reqbuf_len);
+
+                OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len);
+                req->rq_reqbuf = newbuf;
+                req->rq_reqbuf_len = newbuf_size;
+                req->rq_reqmsg = lustre_msg_buf(req->rq_reqbuf, 0, 0);
+        }
+
+        _sptlrpc_enlarge_msg_inplace(req->rq_reqbuf, 0, newmsg_size);
+        _sptlrpc_enlarge_msg_inplace(req->rq_reqmsg, segment, newsize);
+
+        req->rq_reqlen = newmsg_size;
+        RETURN(0);
 }
 
 static