From 52825eea91d35705945b02ed2f5334a160712ace Mon Sep 17 00:00:00 2001 From: ericm Date: Thu, 12 Oct 2006 17:59:27 +0000 Subject: [PATCH] branch: b_new_cmd - first part of b11012: mdc_realloc_openmsg() should not manipulate request buffer directly, should be done in sptlrpc layer. implement enlarge_msg() in null and plain policy. - restore default to plain for rpcs to MDTs --- lustre/include/liblustre.h | 30 +++++++++++++++++++++ lustre/include/lustre_sec.h | 27 ++++++++++++++++--- lustre/mdc/mdc_locks.c | 17 +----------- lustre/ptlrpc/sec.c | 62 +++++++++++++++++++++++++++++++++++++----- lustre/ptlrpc/sec_null.c | 66 ++++++++++++++++++++++++++++----------------- lustre/ptlrpc/sec_plain.c | 55 ++++++++++++++++++++++++++++++++++--- 6 files changed, 202 insertions(+), 55 deletions(-) diff --git a/lustre/include/liblustre.h b/lustre/include/liblustre.h index 17dbceb..290accd 100644 --- a/lustre/include/liblustre.h +++ b/lustre/include/liblustre.h @@ -760,6 +760,36 @@ int test_and_clear_bit(int nr, unsigned long *addr) return oldbit; } +static inline +int fls(int x) +{ + int r = 32; + + if (!x) + return 0; + if (!(x & 0xffff0000u)) { + x <<= 16; + r -= 16; + } + if (!(x & 0xff000000u)) { + x <<= 8; + r -= 8; + } + if (!(x & 0xf0000000u)) { + x <<= 4; + r -= 4; + } + if (!(x & 0xc0000000u)) { + x <<= 2; + r -= 2; + } + if (!(x & 0x80000000u)) { + x <<= 1; + r -= 1; + } + return r; +} + /* FIXME sys/capability will finally included linux/fs.h thus * cause numerous trouble on x86-64. as temporary solution for * build broken at Cray, we copy definition we need from capability.h diff --git a/lustre/include/lustre_sec.h b/lustre/include/lustre_sec.h index 3246b24..ced9bea 100644 --- a/lustre/include/lustre_sec.h +++ b/lustre/include/lustre_sec.h @@ -305,8 +305,7 @@ struct ptlrpc_sec_cops { int (*enlarge_reqbuf) (struct ptlrpc_sec *sec, struct ptlrpc_request *req, - int segment, int newsize, - int move_data); + int segment, int newsize); }; struct ptlrpc_sec_sops { @@ -412,6 +411,28 @@ struct proc_dir_entry; extern struct proc_dir_entry *sptlrpc_proc_root; /* + * round size up to next power of 2, for slab allocation. + * @size must be sane (can't overflow after round up) + */ +static inline int size_roundup_power2(int size) +{ + int rc; + + LASSERT(size > 0); + rc = 1 << (fls(size) - 1); + if ((rc - 1) & size) + rc <<= 1; + LASSERT(rc > 0); + return rc; +} + +/* + * internal support libraries + */ +void _sptlrpc_enlarge_msg_inplace(struct lustre_msg *msg, + int segment, int newsize); + +/* * security type */ int sptlrpc_register_policy(struct ptlrpc_sec_policy *policy); @@ -453,7 +474,7 @@ void sptlrpc_cli_free_reqbuf(struct ptlrpc_request *req); int sptlrpc_cli_alloc_repbuf(struct ptlrpc_request *req, int msgsize); void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req); int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req, - int segment, int newsize, int movedata); + int segment, int newsize); void sptlrpc_request_out_callback(struct ptlrpc_request *req); /* diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index b12266d..9914f53 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -211,16 +211,6 @@ static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc) } } -static int round_up(int val) -{ - int ret = 1; - while (val) { - val >>= 1; - ret <<= 1; - } - return ret; -} - /* Save a large LOV EA into the request buffer so that it is available * for replay. We don't do this in the initial request because the * original request doesn't need this buffer (at most it sends just the @@ -238,7 +228,7 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req, int rc; rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4, - body->eadatasize, 0); + body->eadatasize); if (rc) { CERROR("Can't enlarge segment %d size to %d\n", DLM_INTENT_REC_OFF + 4, body->eadatasize); @@ -312,11 +302,6 @@ int mdc_enqueue(struct obd_export *exp, if (do_join) size[DLM_INTENT_REC_OFF + 5] = sizeof(struct mdt_rec_join); - rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, - 8 + do_join, size); - if (rc & (rc - 1)) - size[ea_off] = min(size[ea_off] + round_up(rc) - rc, - obddev->u.cli.cl_max_mds_easize); req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, LDLM_ENQUEUE, 8 + do_join, size, NULL); diff --git a/lustre/ptlrpc/sec.c b/lustre/ptlrpc/sec.c index 1f1d615..94765d3 100644 --- a/lustre/ptlrpc/sec.c +++ b/lustre/ptlrpc/sec.c @@ -1322,11 +1322,60 @@ void sptlrpc_cli_free_reqbuf(struct ptlrpc_request *req) policy->sp_cops->free_reqbuf(ctx->cc_sec, req); } +/* + * NOTE caller must guarantee the buffer size is enough for the enlargement + */ +void _sptlrpc_enlarge_msg_inplace(struct lustre_msg *msg, + int segment, int newsize) +{ + void *src, *dst; + int oldsize, oldmsg_size, movesize; + + LASSERT(segment < msg->lm_bufcount); + LASSERT(msg->lm_buflens[segment] < newsize); + + /* nothing to do if we are enlarging the last segment */ + if (segment == msg->lm_bufcount - 1) { + msg->lm_buflens[segment] = newsize; + return; + } + + oldsize = msg->lm_buflens[segment]; + + src = lustre_msg_buf(msg, segment + 1, 0); + msg->lm_buflens[segment] = newsize; + dst = lustre_msg_buf(msg, segment + 1, 0); + msg->lm_buflens[segment] = oldsize; + + /* move from segment + 1 to end segment */ + LASSERT(msg->lm_magic == LUSTRE_MSG_MAGIC_V2); + oldmsg_size = lustre_msg_size_v2(msg->lm_bufcount, msg->lm_buflens); + movesize = oldmsg_size - ((unsigned long) src - (unsigned long) msg); + LASSERT(movesize >= 0); + + if (movesize) + memmove(dst, src, movesize); + + /* note we don't clear the ares where old data live, not secret */ + + /* finally set new segment size */ + msg->lm_buflens[segment] = newsize; +} +EXPORT_SYMBOL(_sptlrpc_enlarge_msg_inplace); + +/* + * enlarge @segment of upper message req->rq_reqmsg to @newsize, all data + * will be preserved after enlargement. this must be called after rq_reqmsg has + * been intialized at least. + * + * caller's attention: upon return, rq_reqmsg and rq_reqlen might have + * been changed. + */ int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req, - int segment, int newsize, int movedata) + int segment, int newsize) { struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx; - struct ptlrpc_sec_policy *policy; + struct ptlrpc_sec_cops *cops; struct lustre_msg *msg = req->rq_reqmsg; LASSERT(ctx); @@ -1337,10 +1386,9 @@ int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req, if (msg->lm_buflens[segment] == newsize) return 0; - policy = ctx->cc_sec->ps_policy; - LASSERT(policy->sp_cops->enlarge_reqbuf); - return policy->sp_cops->enlarge_reqbuf(ctx->cc_sec, req, - segment, newsize, movedata); + cops = ctx->cc_sec->ps_policy->sp_cops; + LASSERT(cops->enlarge_reqbuf); + return cops->enlarge_reqbuf(ctx->cc_sec, req, segment, newsize); } EXPORT_SYMBOL(sptlrpc_cli_enlarge_reqbuf); @@ -1853,7 +1901,7 @@ int get_default_flavor(enum lustre_part to_part, struct sec_flavor_config *conf) switch (to_part) { case LUSTRE_MDT: - conf->sfc_rpc_flavor = SPTLRPC_FLVR_NULL;//XXX SPTLRPC_FLVR_PLAIN; + conf->sfc_rpc_flavor = SPTLRPC_FLVR_PLAIN; return 0; case LUSTRE_OST: conf->sfc_rpc_flavor = SPTLRPC_FLVR_NULL; diff --git a/lustre/ptlrpc/sec_null.c b/lustre/ptlrpc/sec_null.c index 38bca6f..2b06e00 100644 --- a/lustre/ptlrpc/sec_null.c +++ b/lustre/ptlrpc/sec_null.c @@ -104,12 +104,14 @@ int null_alloc_reqbuf(struct ptlrpc_sec *sec, int msgsize) { if (!req->rq_reqbuf) { + int alloc_size = size_roundup_power2(msgsize); + LASSERT(!req->rq_pool); - OBD_ALLOC(req->rq_reqbuf, msgsize); + OBD_ALLOC(req->rq_reqbuf, alloc_size); if (!req->rq_reqbuf) return -ENOMEM; - req->rq_reqbuf_len = msgsize; + req->rq_reqbuf_len = alloc_size; } else { LASSERT(req->rq_pool); LASSERT(req->rq_reqbuf_len >= msgsize); @@ -126,8 +128,11 @@ void null_free_reqbuf(struct ptlrpc_sec *sec, { if (!req->rq_pool) { LASSERTF(req->rq_reqmsg == req->rq_reqbuf, - "reqmsg %p is not reqbuf %p in NULL sec\n", + "reqmsg %p is not reqbuf %p in null sec\n", req->rq_reqmsg, req->rq_reqbuf); + LASSERTF(req->rq_reqbuf_len >= req->rq_reqlen, + "reqlen %d should smaller than buflen %d\n", + req->rq_reqlen, req->rq_reqbuf_len); OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len); req->rq_reqmsg = req->rq_reqbuf = NULL; @@ -140,6 +145,8 @@ int null_alloc_repbuf(struct ptlrpc_sec *sec, struct ptlrpc_request *req, int msgsize) { + msgsize = size_roundup_power2(msgsize); + OBD_ALLOC(req->rq_repbuf, msgsize); if (!req->rq_repbuf) return -ENOMEM; @@ -160,37 +167,46 @@ void null_free_repbuf(struct ptlrpc_sec *sec, static int null_enlarge_reqbuf(struct ptlrpc_sec *sec, struct ptlrpc_request *req, - int segment, int newsize, int move_data) + int segment, int newsize) { - struct lustre_msg *oldmsg = req->rq_reqbuf, *newmsg; - int oldsize, new_msgsize; + struct lustre_msg *newbuf; + int oldsize, newmsg_size, alloc_size; LASSERT(req->rq_reqbuf); LASSERT(req->rq_reqbuf == req->rq_reqmsg); - LASSERT(!move_data); // XXX - - oldsize = oldmsg->lm_buflens[segment]; - oldmsg->lm_buflens[segment] = newsize; - - new_msgsize = lustre_msg_size(oldmsg->lm_magic, - oldmsg->lm_bufcount, oldmsg->lm_buflens); - - /* FIXME need move data!!! */ - if (req->rq_pool) { - req->rq_reqlen = new_msgsize; - } else { - OBD_ALLOC(newmsg, new_msgsize); - if (newmsg == NULL) { - oldmsg->lm_buflens[segment] = oldsize; + LASSERT(req->rq_reqbuf_len >= req->rq_reqlen); + LASSERT(req->rq_reqlen == lustre_msg_size(req->rq_reqmsg->lm_magic, + req->rq_reqmsg->lm_bufcount, + req->rq_reqmsg->lm_buflens)); + + /* compute new message size */ + oldsize = req->rq_reqbuf->lm_buflens[segment]; + req->rq_reqbuf->lm_buflens[segment] = newsize; + newmsg_size = lustre_msg_size(req->rq_reqbuf->lm_magic, + req->rq_reqbuf->lm_bufcount, + req->rq_reqbuf->lm_buflens); + req->rq_reqbuf->lm_buflens[segment] = oldsize; + + /* request from pool should always have enough buffer */ + LASSERT(!req->rq_pool || req->rq_reqbuf_len >= newmsg_size); + + if (req->rq_reqbuf_len < newmsg_size) { + alloc_size = size_roundup_power2(newmsg_size); + + OBD_ALLOC(newbuf, alloc_size); + if (newbuf == NULL) return -ENOMEM; - } - memcpy(newmsg, oldmsg, req->rq_reqlen); + + memcpy(newbuf, req->rq_reqbuf, req->rq_reqlen); OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len); - req->rq_reqbuf = req->rq_reqmsg = newmsg; - req->rq_reqbuf_len = req->rq_reqlen = new_msgsize; + req->rq_reqbuf = req->rq_reqmsg = newbuf; + req->rq_reqbuf_len = alloc_size; } + _sptlrpc_enlarge_msg_inplace(req->rq_reqmsg, segment, newsize); + req->rq_reqlen = newmsg_size; + return 0; } diff --git a/lustre/ptlrpc/sec_plain.c b/lustre/ptlrpc/sec_plain.c index 53abfba..794600d 100644 --- a/lustre/ptlrpc/sec_plain.c +++ b/lustre/ptlrpc/sec_plain.c @@ -181,9 +181,10 @@ int plain_alloc_reqbuf(struct ptlrpc_sec *sec, alloc_len = lustre_msg_size_v2(bufcnt, buflens); - if (!req->rq_reqbuf) { LASSERT(!req->rq_pool); + + alloc_len = size_roundup_power2(alloc_len); OBD_ALLOC(req->rq_reqbuf, alloc_len); if (!req->rq_reqbuf) RETURN(-ENOMEM); @@ -237,6 +238,7 @@ int plain_alloc_repbuf(struct ptlrpc_sec *sec, } alloc_len = lustre_msg_size_v2(bufcnt, buflens); + alloc_len = size_roundup_power2(alloc_len); OBD_ALLOC(req->rq_repbuf, alloc_len); if (!req->rq_repbuf) @@ -260,10 +262,55 @@ void plain_free_repbuf(struct ptlrpc_sec *sec, static int plain_enlarge_reqbuf(struct ptlrpc_sec *sec, struct ptlrpc_request *req, - int segment, int newsize, int move_data) + int segment, int newsize) { - LBUG(); - return 0; + struct lustre_msg *newbuf; + int oldsize; + int newmsg_size, newbuf_size; + ENTRY; + + /* embedded msg always at seg 0 */ + LASSERT(req->rq_reqbuf); + LASSERT(req->rq_reqbuf_len >= req->rq_reqlen); + LASSERT(lustre_msg_buf(req->rq_reqbuf, 0, 0) == req->rq_reqmsg); + + /* compute new embedded msg size. */ + oldsize = req->rq_reqmsg->lm_buflens[segment]; + req->rq_reqmsg->lm_buflens[segment] = newsize; + newmsg_size = lustre_msg_size_v2(req->rq_reqmsg->lm_bufcount, + req->rq_reqmsg->lm_buflens); + req->rq_reqmsg->lm_buflens[segment] = oldsize; + + /* compute new wrapper msg size. */ + oldsize = req->rq_reqbuf->lm_buflens[0]; + req->rq_reqbuf->lm_buflens[0] = newmsg_size; + newbuf_size = lustre_msg_size_v2(req->rq_reqbuf->lm_bufcount, + req->rq_reqbuf->lm_buflens); + req->rq_reqbuf->lm_buflens[0] = oldsize; + + /* request from pool should always have enough buffer */ + LASSERT(!req->rq_pool || req->rq_reqbuf_len >= newbuf_size); + + if (req->rq_reqbuf_len < newbuf_size) { + newbuf_size = size_roundup_power2(newbuf_size); + + OBD_ALLOC(newbuf, newbuf_size); + if (newbuf == NULL) + RETURN(-ENOMEM); + + memcpy(newbuf, req->rq_reqbuf, req->rq_reqbuf_len); + + OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len); + req->rq_reqbuf = newbuf; + req->rq_reqbuf_len = newbuf_size; + req->rq_reqmsg = lustre_msg_buf(req->rq_reqbuf, 0, 0); + } + + _sptlrpc_enlarge_msg_inplace(req->rq_reqbuf, 0, newmsg_size); + _sptlrpc_enlarge_msg_inplace(req->rq_reqmsg, segment, newsize); + + req->rq_reqlen = newmsg_size; + RETURN(0); } static -- 1.8.3.1