+/*
+ * upon this be called, the reply buffer should have been un-posted,
+ * so nothing is going to change.
+ */
+int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
+{
+ LASSERT(req->rq_repbuf);
+ LASSERT(req->rq_repdata == NULL);
+ LASSERT(req->rq_repmsg == NULL);
+ LASSERT(req->rq_reply_off + req->rq_nob_received <= req->rq_repbuf_len);
+
+ if (req->rq_reply_off == 0) {
+ CERROR("real reply with offset 0\n");
+ return -EPROTO;
+ }
+
+ if (req->rq_reply_off % 8 != 0) {
+ CERROR("reply at odd offset %u\n", req->rq_reply_off);
+ return -EPROTO;
+ }
+
+ req->rq_repdata = (struct lustre_msg *)
+ (req->rq_repbuf + req->rq_reply_off);
+ req->rq_repdata_len = req->rq_nob_received;
+
+ return do_cli_unwrap_reply(req);
+}
+
+/*
+ * Upon called, the receive buffer might be still posted, so the reply data
+ * might be changed at any time, no matter we're holding rq_lock or not. we
+ * expect the rq_reply_off be 0, rq_nob_received is the early reply size.
+ *
+ * we allocate a separate buffer to hold early reply data, pointed by
+ * rq_repdata, rq_repdata_len is the early reply size, and round up to power2
+ * is the actual buffer size.
+ *
+ * caller _must_ call sptlrpc_cli_finish_early_reply() after this, before
+ * process another early reply or real reply, to restore ptlrpc_request
+ * to normal status.
+ */
+int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req)
+{
+ struct lustre_msg *early_buf;
+ int early_bufsz, early_size;
+ int rc;
+ ENTRY;
+
+ LASSERT(req->rq_repbuf);
+ LASSERT(req->rq_repdata == NULL);
+ LASSERT(req->rq_repmsg == NULL);
+
+ early_size = req->rq_nob_received;
+ if (early_size < sizeof(struct lustre_msg)) {
+ CERROR("early reply length %d too small\n", early_size);
+ RETURN(-EPROTO);
+ }
+
+ early_bufsz = size_roundup_power2(early_size);
+ OBD_ALLOC(early_buf, early_bufsz);
+ if (early_buf == NULL)
+ RETURN(-ENOMEM);
+
+ /* copy data out, do it inside spinlock */
+ spin_lock(&req->rq_lock);
+
+ if (req->rq_replied) {
+ spin_unlock(&req->rq_lock);
+ GOTO(err_free, rc = -EALREADY);
+ }
+
+ if (req->rq_reply_off != 0) {
+ CERROR("early reply with offset %u\n", req->rq_reply_off);
+ GOTO(err_free, rc = -EPROTO);
+ }
+
+ if (req->rq_nob_received != early_size) {
+ /* even another early arrived the size should be the same */
+ CWARN("data size has changed from %u to %u\n",
+ early_size, req->rq_nob_received);
+ spin_unlock(&req->rq_lock);
+ GOTO(err_free, rc = -EINVAL);
+ }
+
+ if (req->rq_nob_received < sizeof(struct lustre_msg)) {
+ CERROR("early reply length %d too small\n",
+ req->rq_nob_received);
+ spin_unlock(&req->rq_lock);
+ GOTO(err_free, rc = -EALREADY);
+ }
+
+ memcpy(early_buf, req->rq_repbuf, early_size);
+ spin_unlock(&req->rq_lock);
+
+ req->rq_repdata = early_buf;
+ req->rq_repdata_len = early_size;
+
+ rc = do_cli_unwrap_reply(req);
+
+ /* treate resend as an error case. in fact server should never ask
+ * resend via early reply. */
+ if (req->rq_resend) {
+ req->rq_resend = 0;
+ rc = -EPROTO;
+ }
+
+ if (rc) {
+ LASSERT(req->rq_repmsg == NULL);
+ req->rq_repdata = NULL;
+ req->rq_repdata_len = 0;
+ GOTO(err_free, rc);
+ }
+
+ LASSERT(req->rq_repmsg);
+ RETURN(0);
+
+err_free:
+ OBD_FREE(early_buf, early_bufsz);
+ RETURN(rc);
+}
+
+int sptlrpc_cli_finish_early_reply(struct ptlrpc_request *req)
+{
+ int early_bufsz;
+
+ LASSERT(req->rq_repdata);
+ LASSERT(req->rq_repdata_len);
+ LASSERT(req->rq_repmsg);
+
+ early_bufsz = size_roundup_power2(req->rq_repdata_len);
+ OBD_FREE(req->rq_repdata, early_bufsz);
+
+ req->rq_repdata = NULL;
+ req->rq_repdata_len = 0;
+ req->rq_repmsg = NULL;
+ return 0;
+}
+