return oldbit;
}
+static inline
+int fls(int x)
+{
+ int r = 32;
+
+ if (!x)
+ return 0;
+ if (!(x & 0xffff0000u)) {
+ x <<= 16;
+ r -= 16;
+ }
+ if (!(x & 0xff000000u)) {
+ x <<= 8;
+ r -= 8;
+ }
+ if (!(x & 0xf0000000u)) {
+ x <<= 4;
+ r -= 4;
+ }
+ if (!(x & 0xc0000000u)) {
+ x <<= 2;
+ r -= 2;
+ }
+ if (!(x & 0x80000000u)) {
+ x <<= 1;
+ r -= 1;
+ }
+ return r;
+}
+
/* FIXME sys/capability will finally included linux/fs.h thus
* cause numerous trouble on x86-64. as temporary solution for
* build broken at Cray, we copy definition we need from capability.h
int (*enlarge_reqbuf)
(struct ptlrpc_sec *sec,
struct ptlrpc_request *req,
- int segment, int newsize,
- int move_data);
+ int segment, int newsize);
};
struct ptlrpc_sec_sops {
extern struct proc_dir_entry *sptlrpc_proc_root;
/*
+ * round size up to next power of 2, for slab allocation.
+ * @size must be sane (can't overflow after round up)
+ */
+static inline int size_roundup_power2(int size)
+{
+ int rc;
+
+ LASSERT(size > 0);
+ rc = 1 << (fls(size) - 1);
+ if ((rc - 1) & size)
+ rc <<= 1;
+ LASSERT(rc > 0);
+ return rc;
+}
+
+/*
+ * internal support libraries
+ */
+void _sptlrpc_enlarge_msg_inplace(struct lustre_msg *msg,
+ int segment, int newsize);
+
+/*
* security type
*/
int sptlrpc_register_policy(struct ptlrpc_sec_policy *policy);
int sptlrpc_cli_alloc_repbuf(struct ptlrpc_request *req, int msgsize);
void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req);
int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
- int segment, int newsize, int movedata);
+ int segment, int newsize);
void sptlrpc_request_out_callback(struct ptlrpc_request *req);
/*
}
}
-static int round_up(int val)
-{
- int ret = 1;
- while (val) {
- val >>= 1;
- ret <<= 1;
- }
- return ret;
-}
-
/* Save a large LOV EA into the request buffer so that it is available
* for replay. We don't do this in the initial request because the
* original request doesn't need this buffer (at most it sends just the
int rc;
rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
- body->eadatasize, 0);
+ body->eadatasize);
if (rc) {
CERROR("Can't enlarge segment %d size to %d\n",
DLM_INTENT_REC_OFF + 4, body->eadatasize);
if (do_join)
size[DLM_INTENT_REC_OFF + 5] =
sizeof(struct mdt_rec_join);
- rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
- 8 + do_join, size);
- if (rc & (rc - 1))
- size[ea_off] = min(size[ea_off] + round_up(rc) - rc,
- obddev->u.cli.cl_max_mds_easize);
req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
LDLM_ENQUEUE, 8 + do_join, size, NULL);
policy->sp_cops->free_reqbuf(ctx->cc_sec, req);
}
+/*
+ * NOTE caller must guarantee the buffer size is enough for the enlargement
+ */
+void _sptlrpc_enlarge_msg_inplace(struct lustre_msg *msg,
+ int segment, int newsize)
+{
+ void *src, *dst;
+ int oldsize, oldmsg_size, movesize;
+
+ LASSERT(segment < msg->lm_bufcount);
+ LASSERT(msg->lm_buflens[segment] < newsize);
+
+ /* nothing to do if we are enlarging the last segment */
+ if (segment == msg->lm_bufcount - 1) {
+ msg->lm_buflens[segment] = newsize;
+ return;
+ }
+
+ oldsize = msg->lm_buflens[segment];
+
+ src = lustre_msg_buf(msg, segment + 1, 0);
+ msg->lm_buflens[segment] = newsize;
+ dst = lustre_msg_buf(msg, segment + 1, 0);
+ msg->lm_buflens[segment] = oldsize;
+
+ /* move from segment + 1 to end segment */
+ LASSERT(msg->lm_magic == LUSTRE_MSG_MAGIC_V2);
+ oldmsg_size = lustre_msg_size_v2(msg->lm_bufcount, msg->lm_buflens);
+ movesize = oldmsg_size - ((unsigned long) src - (unsigned long) msg);
+ LASSERT(movesize >= 0);
+
+ if (movesize)
+ memmove(dst, src, movesize);
+
+ /* note we don't clear the ares where old data live, not secret */
+
+ /* finally set new segment size */
+ msg->lm_buflens[segment] = newsize;
+}
+EXPORT_SYMBOL(_sptlrpc_enlarge_msg_inplace);
+
+/*
+ * enlarge @segment of upper message req->rq_reqmsg to @newsize, all data
+ * will be preserved after enlargement. this must be called after rq_reqmsg has
+ * been intialized at least.
+ *
+ * caller's attention: upon return, rq_reqmsg and rq_reqlen might have
+ * been changed.
+ */
int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
- int segment, int newsize, int movedata)
+ int segment, int newsize)
{
struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
- struct ptlrpc_sec_policy *policy;
+ struct ptlrpc_sec_cops *cops;
struct lustre_msg *msg = req->rq_reqmsg;
LASSERT(ctx);
if (msg->lm_buflens[segment] == newsize)
return 0;
- policy = ctx->cc_sec->ps_policy;
- LASSERT(policy->sp_cops->enlarge_reqbuf);
- return policy->sp_cops->enlarge_reqbuf(ctx->cc_sec, req,
- segment, newsize, movedata);
+ cops = ctx->cc_sec->ps_policy->sp_cops;
+ LASSERT(cops->enlarge_reqbuf);
+ return cops->enlarge_reqbuf(ctx->cc_sec, req, segment, newsize);
}
EXPORT_SYMBOL(sptlrpc_cli_enlarge_reqbuf);
switch (to_part) {
case LUSTRE_MDT:
- conf->sfc_rpc_flavor = SPTLRPC_FLVR_NULL;//XXX SPTLRPC_FLVR_PLAIN;
+ conf->sfc_rpc_flavor = SPTLRPC_FLVR_PLAIN;
return 0;
case LUSTRE_OST:
conf->sfc_rpc_flavor = SPTLRPC_FLVR_NULL;
int msgsize)
{
if (!req->rq_reqbuf) {
+ int alloc_size = size_roundup_power2(msgsize);
+
LASSERT(!req->rq_pool);
- OBD_ALLOC(req->rq_reqbuf, msgsize);
+ OBD_ALLOC(req->rq_reqbuf, alloc_size);
if (!req->rq_reqbuf)
return -ENOMEM;
- req->rq_reqbuf_len = msgsize;
+ req->rq_reqbuf_len = alloc_size;
} else {
LASSERT(req->rq_pool);
LASSERT(req->rq_reqbuf_len >= msgsize);
{
if (!req->rq_pool) {
LASSERTF(req->rq_reqmsg == req->rq_reqbuf,
- "reqmsg %p is not reqbuf %p in NULL sec\n",
+ "reqmsg %p is not reqbuf %p in null sec\n",
req->rq_reqmsg, req->rq_reqbuf);
+ LASSERTF(req->rq_reqbuf_len >= req->rq_reqlen,
+ "reqlen %d should smaller than buflen %d\n",
+ req->rq_reqlen, req->rq_reqbuf_len);
OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len);
req->rq_reqmsg = req->rq_reqbuf = NULL;
struct ptlrpc_request *req,
int msgsize)
{
+ msgsize = size_roundup_power2(msgsize);
+
OBD_ALLOC(req->rq_repbuf, msgsize);
if (!req->rq_repbuf)
return -ENOMEM;
static
int null_enlarge_reqbuf(struct ptlrpc_sec *sec,
struct ptlrpc_request *req,
- int segment, int newsize, int move_data)
+ int segment, int newsize)
{
- struct lustre_msg *oldmsg = req->rq_reqbuf, *newmsg;
- int oldsize, new_msgsize;
+ struct lustre_msg *newbuf;
+ int oldsize, newmsg_size, alloc_size;
LASSERT(req->rq_reqbuf);
LASSERT(req->rq_reqbuf == req->rq_reqmsg);
- LASSERT(!move_data); // XXX
-
- oldsize = oldmsg->lm_buflens[segment];
- oldmsg->lm_buflens[segment] = newsize;
-
- new_msgsize = lustre_msg_size(oldmsg->lm_magic,
- oldmsg->lm_bufcount, oldmsg->lm_buflens);
-
- /* FIXME need move data!!! */
- if (req->rq_pool) {
- req->rq_reqlen = new_msgsize;
- } else {
- OBD_ALLOC(newmsg, new_msgsize);
- if (newmsg == NULL) {
- oldmsg->lm_buflens[segment] = oldsize;
+ LASSERT(req->rq_reqbuf_len >= req->rq_reqlen);
+ LASSERT(req->rq_reqlen == lustre_msg_size(req->rq_reqmsg->lm_magic,
+ req->rq_reqmsg->lm_bufcount,
+ req->rq_reqmsg->lm_buflens));
+
+ /* compute new message size */
+ oldsize = req->rq_reqbuf->lm_buflens[segment];
+ req->rq_reqbuf->lm_buflens[segment] = newsize;
+ newmsg_size = lustre_msg_size(req->rq_reqbuf->lm_magic,
+ req->rq_reqbuf->lm_bufcount,
+ req->rq_reqbuf->lm_buflens);
+ req->rq_reqbuf->lm_buflens[segment] = oldsize;
+
+ /* request from pool should always have enough buffer */
+ LASSERT(!req->rq_pool || req->rq_reqbuf_len >= newmsg_size);
+
+ if (req->rq_reqbuf_len < newmsg_size) {
+ alloc_size = size_roundup_power2(newmsg_size);
+
+ OBD_ALLOC(newbuf, alloc_size);
+ if (newbuf == NULL)
return -ENOMEM;
- }
- memcpy(newmsg, oldmsg, req->rq_reqlen);
+
+ memcpy(newbuf, req->rq_reqbuf, req->rq_reqlen);
OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len);
- req->rq_reqbuf = req->rq_reqmsg = newmsg;
- req->rq_reqbuf_len = req->rq_reqlen = new_msgsize;
+ req->rq_reqbuf = req->rq_reqmsg = newbuf;
+ req->rq_reqbuf_len = alloc_size;
}
+ _sptlrpc_enlarge_msg_inplace(req->rq_reqmsg, segment, newsize);
+ req->rq_reqlen = newmsg_size;
+
return 0;
}
alloc_len = lustre_msg_size_v2(bufcnt, buflens);
-
if (!req->rq_reqbuf) {
LASSERT(!req->rq_pool);
+
+ alloc_len = size_roundup_power2(alloc_len);
OBD_ALLOC(req->rq_reqbuf, alloc_len);
if (!req->rq_reqbuf)
RETURN(-ENOMEM);
}
alloc_len = lustre_msg_size_v2(bufcnt, buflens);
+ alloc_len = size_roundup_power2(alloc_len);
OBD_ALLOC(req->rq_repbuf, alloc_len);
if (!req->rq_repbuf)
static
int plain_enlarge_reqbuf(struct ptlrpc_sec *sec,
struct ptlrpc_request *req,
- int segment, int newsize, int move_data)
+ int segment, int newsize)
{
- LBUG();
- return 0;
+ struct lustre_msg *newbuf;
+ int oldsize;
+ int newmsg_size, newbuf_size;
+ ENTRY;
+
+ /* embedded msg always at seg 0 */
+ LASSERT(req->rq_reqbuf);
+ LASSERT(req->rq_reqbuf_len >= req->rq_reqlen);
+ LASSERT(lustre_msg_buf(req->rq_reqbuf, 0, 0) == req->rq_reqmsg);
+
+ /* compute new embedded msg size. */
+ oldsize = req->rq_reqmsg->lm_buflens[segment];
+ req->rq_reqmsg->lm_buflens[segment] = newsize;
+ newmsg_size = lustre_msg_size_v2(req->rq_reqmsg->lm_bufcount,
+ req->rq_reqmsg->lm_buflens);
+ req->rq_reqmsg->lm_buflens[segment] = oldsize;
+
+ /* compute new wrapper msg size. */
+ oldsize = req->rq_reqbuf->lm_buflens[0];
+ req->rq_reqbuf->lm_buflens[0] = newmsg_size;
+ newbuf_size = lustre_msg_size_v2(req->rq_reqbuf->lm_bufcount,
+ req->rq_reqbuf->lm_buflens);
+ req->rq_reqbuf->lm_buflens[0] = oldsize;
+
+ /* request from pool should always have enough buffer */
+ LASSERT(!req->rq_pool || req->rq_reqbuf_len >= newbuf_size);
+
+ if (req->rq_reqbuf_len < newbuf_size) {
+ newbuf_size = size_roundup_power2(newbuf_size);
+
+ OBD_ALLOC(newbuf, newbuf_size);
+ if (newbuf == NULL)
+ RETURN(-ENOMEM);
+
+ memcpy(newbuf, req->rq_reqbuf, req->rq_reqbuf_len);
+
+ OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len);
+ req->rq_reqbuf = newbuf;
+ req->rq_reqbuf_len = newbuf_size;
+ req->rq_reqmsg = lustre_msg_buf(req->rq_reqbuf, 0, 0);
+ }
+
+ _sptlrpc_enlarge_msg_inplace(req->rq_reqbuf, 0, newmsg_size);
+ _sptlrpc_enlarge_msg_inplace(req->rq_reqmsg, segment, newsize);
+
+ req->rq_reqlen = newmsg_size;
+ RETURN(0);
}
static