- struct lustre_msg *newbuf;
- struct lustre_msg *oldbuf = req->rq_reqmsg;
- int oldsize, newmsg_size, alloc_size;
-
- LASSERT(req->rq_reqbuf);
- LASSERT(req->rq_reqbuf == req->rq_reqmsg);
- LASSERT(req->rq_reqbuf_len >= req->rq_reqlen);
- LASSERT(req->rq_reqlen == lustre_packed_msg_size(oldbuf));
-
- /* compute new message size */
- oldsize = req->rq_reqbuf->lm_buflens[segment];
- req->rq_reqbuf->lm_buflens[segment] = newsize;
- newmsg_size = lustre_packed_msg_size(oldbuf);
- req->rq_reqbuf->lm_buflens[segment] = oldsize;
-
- /* request from pool should always have enough buffer */
- LASSERT(!req->rq_pool || req->rq_reqbuf_len >= newmsg_size);
-
- if (req->rq_reqbuf_len < newmsg_size) {
- alloc_size = size_roundup_power2(newmsg_size);
-
- OBD_ALLOC_LARGE(newbuf, alloc_size);
- if (newbuf == NULL)
- return -ENOMEM;
-
- memcpy(newbuf, req->rq_reqbuf, req->rq_reqlen);
-
- OBD_FREE_LARGE(req->rq_reqbuf, req->rq_reqbuf_len);
- req->rq_reqbuf = req->rq_reqmsg = newbuf;
- req->rq_reqbuf_len = alloc_size;
- }
+ struct lustre_msg *newbuf;
+ struct lustre_msg *oldbuf = req->rq_reqmsg;
+ int oldsize, newmsg_size, alloc_size;
+
+ LASSERT(req->rq_reqbuf);
+ LASSERT(req->rq_reqbuf == req->rq_reqmsg);
+ LASSERT(req->rq_reqbuf_len >= req->rq_reqlen);
+ LASSERT(req->rq_reqlen == lustre_packed_msg_size(oldbuf));
+
+ /* compute new message size */
+ oldsize = req->rq_reqbuf->lm_buflens[segment];
+ req->rq_reqbuf->lm_buflens[segment] = newsize;
+ newmsg_size = lustre_packed_msg_size(oldbuf);
+ req->rq_reqbuf->lm_buflens[segment] = oldsize;
+
+ /* request from pool should always have enough buffer */
+ LASSERT(!req->rq_pool || req->rq_reqbuf_len >= newmsg_size);
+
+ if (req->rq_reqbuf_len < newmsg_size) {
+ alloc_size = size_roundup_power2(newmsg_size);
+
+ OBD_ALLOC_LARGE(newbuf, alloc_size);
+ if (newbuf == NULL)
+ return -ENOMEM;
+
+ /*
+ * Must lock this, so that otherwise unprotected change of
+ * rq_reqmsg is not racing with parallel processing of
+ * imp_replay_list traversing threads. See LU-3333
+ * This is a bandaid at best, we really need to deal with this
+ * in request enlarging code before unpacking that's already
+ * there
+ */
+ if (req->rq_import)
+ spin_lock(&req->rq_import->imp_lock);
+ memcpy(newbuf, req->rq_reqbuf, req->rq_reqlen);
+
+ OBD_FREE_LARGE(req->rq_reqbuf, req->rq_reqbuf_len);
+ req->rq_reqbuf = req->rq_reqmsg = newbuf;
+ req->rq_reqbuf_len = alloc_size;
+
+ if (req->rq_import)
+ spin_unlock(&req->rq_import->imp_lock);
+ }