- fix race of handling early reply in sptlrpc.
- port AT api changes from b1_8 (b16972)
b=16999
r=rread
r=wangdi
void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req);
int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
int segment, int newsize);
-int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req);
-int sptlrpc_cli_finish_early_reply(struct ptlrpc_request *req);
+int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req,
+ struct ptlrpc_request **req_ret);
+void sptlrpc_cli_finish_early_reply(struct ptlrpc_request *early_req);
void sptlrpc_request_out_callback(struct ptlrpc_request *req);
}
/* Adjust max service estimate based on server value */
-static void ptlrpc_at_adj_service(struct ptlrpc_request *req)
+static void ptlrpc_at_adj_service(struct ptlrpc_request *req,
+ unsigned int serv_est)
{
int idx;
- unsigned int serv_est, oldse;
- struct imp_at *at = &req->rq_import->imp_at;
+ unsigned int oldse;
+ struct imp_at *at;
LASSERT(req->rq_import);
-
- /* service estimate is returned in the repmsg timeout field,
- may be 0 on err */
- serv_est = lustre_msg_get_timeout(req->rq_repmsg);
+ at = &req->rq_import->imp_at;
idx = import_at_get_index(req->rq_import, req->rq_request_portal);
/* max service estimates are tracked on the server side,
}
/* Adjust expected network latency */
-static void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req)
+static void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
+ unsigned int service_time)
{
- unsigned int st, nl, oldnl;
- struct imp_at *at = &req->rq_import->imp_at;
+ unsigned int nl, oldnl;
+ struct imp_at *at;
time_t now = cfs_time_current_sec();
LASSERT(req->rq_import);
-
- st = lustre_msg_get_service_time(req->rq_repmsg);
+ at = &req->rq_import->imp_at;
/* Network latency is total time less server processing time */
- nl = max_t(int, now - req->rq_sent - st, 0) + 1/*st rounding*/;
- if (st > now - req->rq_sent + 3 /* bz16408 */)
+ nl = max_t(int, now - req->rq_sent - service_time, 0) +1/*st rounding*/;
+ if (service_time > now - req->rq_sent + 3 /* bz16408 */)
CWARN("Reported service time %u > total measured time "
- CFS_DURATION_T"\n", st, cfs_time_sub(now, req->rq_sent));
+ CFS_DURATION_T"\n", service_time,
+ cfs_time_sub(now, req->rq_sent));
oldnl = at_add(&at->iat_net_latency, nl);
if (oldnl != 0)
* Handle an early reply message, called with the rq_lock held.
* If anything goes wrong just ignore it - same as if it never happened
*/
-static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req) {
- time_t olddl;
- int rc;
+static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req)
+{
+ struct ptlrpc_request *early_req;
+ time_t olddl;
+ int rc;
ENTRY;
req->rq_early = 0;
spin_unlock(&req->rq_lock);
- rc = sptlrpc_cli_unwrap_early_reply(req);
- if (rc)
- GOTO(out, rc);
-
- rc = unpack_reply(req);
- if (rc)
- GOTO(out_cleanup, rc);
+ rc = sptlrpc_cli_unwrap_early_reply(req, &early_req);
+ if (rc) {
+ spin_lock(&req->rq_lock);
+ RETURN(rc);
+ }
- /* Expecting to increase the service time estimate here */
- ptlrpc_at_adj_service(req);
- ptlrpc_at_adj_net_latency(req);
+ rc = unpack_reply(early_req);
+ if (rc == 0) {
+ /* Expecting to increase the service time estimate here */
+ ptlrpc_at_adj_service(req,
+ lustre_msg_get_timeout(early_req->rq_repmsg));
+ ptlrpc_at_adj_net_latency(req,
+ lustre_msg_get_service_time(early_req->rq_repmsg));
+ }
- /* Adjust the local timeout for this req */
- ptlrpc_at_set_req_timeout(req);
+ sptlrpc_cli_finish_early_reply(early_req);
- olddl = req->rq_deadline;
- /* server assumes it now has rq_timeout from when it sent the
- early reply, so client should give it at least that long. */
- req->rq_deadline = cfs_time_current_sec() + req->rq_timeout +
- ptlrpc_at_get_net_latency(req);
-
- DEBUG_REQ(D_ADAPTTO, req,
- "Early reply #%d, new deadline in "CFS_DURATION_T"s ("
- CFS_DURATION_T"s)", req->rq_early_count,
- cfs_time_sub(req->rq_deadline, cfs_time_current_sec()),
- cfs_time_sub(req->rq_deadline, olddl));
-
-out_cleanup:
- sptlrpc_cli_finish_early_reply(req);
-out:
spin_lock(&req->rq_lock);
+
+ if (rc == 0) {
+ /* Adjust the local timeout for this req */
+ ptlrpc_at_set_req_timeout(req);
+
+ olddl = req->rq_deadline;
+ /* server assumes it now has rq_timeout from when it sent the
+ early reply, so client should give it at least that long. */
+ req->rq_deadline = cfs_time_current_sec() + req->rq_timeout +
+ ptlrpc_at_get_net_latency(req);
+
+ DEBUG_REQ(D_ADAPTTO, req,
+ "Early reply #%d, new deadline in "CFS_DURATION_T"s "
+ "("CFS_DURATION_T"s)", req->rq_early_count,
+ cfs_time_sub(req->rq_deadline,
+ cfs_time_current_sec()),
+ cfs_time_sub(req->rq_deadline, olddl));
+ }
+
RETURN(rc);
}
}
OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, obd_fail_val);
- ptlrpc_at_adj_service(req);
- ptlrpc_at_adj_net_latency(req);
+ ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg));
+ ptlrpc_at_adj_net_latency(req,
+ lustre_msg_get_service_time(req->rq_repmsg));
rc = ptlrpc_check_status(req);
imp->imp_connect_error = rc;
struct gss_header *ghdr, *reqhdr;
struct lustre_msg *msg = req->rq_repdata;
__u32 major;
- int pack_bulk, early = 0, rc = 0;
+ int pack_bulk, rc = 0;
ENTRY;
LASSERT(req->rq_cli_ctx == ctx);
gctx = container_of(ctx, struct gss_cli_ctx, gc_base);
- if ((char *) msg < req->rq_repbuf ||
- (char *) msg >= req->rq_repbuf + req->rq_repbuf_len)
- early = 1;
-
/* special case for context negotiation, rq_repmsg/rq_replen actually
* are not used currently. but early reply always be treated normally */
- if (req->rq_ctx_init && !early) {
+ if (req->rq_ctx_init && !req->rq_early) {
req->rq_repmsg = lustre_msg_buf(msg, 1, 0);
req->rq_replen = msg->lm_buflens[1];
RETURN(0);
case PTLRPC_GSS_PROC_DATA:
pack_bulk = ghdr->gh_flags & LUSTRE_GSS_PACK_BULK;
- if (!early && !equi(req->rq_pack_bulk == 1, pack_bulk)) {
+ if (!req->rq_early && !equi(req->rq_pack_bulk == 1, pack_bulk)){
CERROR("%s bulk flag in reply\n",
req->rq_pack_bulk ? "missing" : "unexpected");
RETURN(-EPROTO);
if (major != GSS_S_COMPLETE)
RETURN(-EPERM);
- if (early && reqhdr->gh_svc == SPTLRPC_SVC_NULL) {
+ if (req->rq_early && reqhdr->gh_svc == SPTLRPC_SVC_NULL) {
__u32 cksum;
cksum = crc32_le(!(__u32) 0,
req->rq_replen = msg->lm_buflens[1];
break;
case PTLRPC_GSS_PROC_ERR:
- if (early) {
+ if (req->rq_early) {
CERROR("server return error with early reply\n");
rc = -EPROTO;
} else {
struct gss_cli_ctx *gctx;
struct gss_header *ghdr;
struct lustre_msg *msg = req->rq_repdata;
- int msglen, pack_bulk, early = 0, rc;
+ int msglen, pack_bulk, rc;
__u32 major;
ENTRY;
gctx = container_of(ctx, struct gss_cli_ctx, gc_base);
- if ((char *) msg < req->rq_repbuf ||
- (char *) msg >= req->rq_repbuf + req->rq_repbuf_len)
- early = 1;
-
ghdr = gss_swab_header(msg, 0);
if (ghdr == NULL) {
CERROR("can't decode gss header\n");
case PTLRPC_GSS_PROC_DATA:
pack_bulk = ghdr->gh_flags & LUSTRE_GSS_PACK_BULK;
- if (!early && !equi(req->rq_pack_bulk == 1, pack_bulk)) {
+ if (!req->rq_early && !equi(req->rq_pack_bulk == 1, pack_bulk)){
CERROR("%s bulk flag in reply\n",
req->rq_pack_bulk ? "missing" : "unexpected");
RETURN(-EPROTO);
rc = 0;
break;
case PTLRPC_GSS_PROC_ERR:
- rc = gss_cli_ctx_handle_err_notify(ctx, req, ghdr);
+ if (req->rq_early) {
+ CERROR("server return error with early reply\n");
+ rc = -EPROTO;
+ } else {
+ rc = gss_cli_ctx_handle_err_notify(ctx, req, ghdr);
+ }
break;
default:
CERROR("unexpected proc %d\n", ghdr->gh_proc);
return do_cli_unwrap_reply(req);
}
-/*
+/**
* Upon called, the receive buffer might be still posted, so the reply data
* might be changed at any time, no matter we're holding rq_lock or not. we
* expect the rq_reply_off be 0, rq_nob_received is the early reply size.
*
- * we allocate a separate buffer to hold early reply data, pointed by
- * rq_repdata, rq_repdata_len is the early reply size, and round up to power2
- * is the actual buffer size.
- *
- * caller _must_ call sptlrpc_cli_finish_early_reply() after this, before
- * process another early reply or real reply, to restore ptlrpc_request
- * to normal status.
+ * we allocate separate ptlrpc_request and reply buffer for early reply
+ * processing, return 0 and @req_ret is a duplicated ptlrpc_request. caller
+ * must call sptlrpc_cli_finish_early_reply() on the returned request to
+ * release it. if anything goes wrong @req_ret will not be set.
*/
-int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req)
+int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req,
+ struct ptlrpc_request **req_ret)
{
- struct lustre_msg *early_buf;
+ struct ptlrpc_request *early_req;
+ char *early_buf;
int early_bufsz, early_size;
int rc;
ENTRY;
- LASSERT(req->rq_repbuf);
- LASSERT(req->rq_repdata == NULL);
- LASSERT(req->rq_repmsg == NULL);
+ OBD_ALLOC_PTR(early_req);
+ if (early_req == NULL)
+ RETURN(-ENOMEM);
early_size = req->rq_nob_received;
- if (early_size < sizeof(struct lustre_msg)) {
- CERROR("early reply length %d too small\n", early_size);
- RETURN(-EPROTO);
- }
-
early_bufsz = size_roundup_power2(early_size);
OBD_ALLOC(early_buf, early_bufsz);
if (early_buf == NULL)
- RETURN(-ENOMEM);
+ GOTO(err_req, rc = -ENOMEM);
- /* copy data out, do it inside spinlock */
+ /* sanity checkings and copy data out, do it inside spinlock */
spin_lock(&req->rq_lock);
if (req->rq_replied) {
spin_unlock(&req->rq_lock);
- GOTO(err_free, rc = -EALREADY);
+ GOTO(err_buf, rc = -EALREADY);
}
+ LASSERT(req->rq_repbuf);
+ LASSERT(req->rq_repdata == NULL);
+ LASSERT(req->rq_repmsg == NULL);
+
if (req->rq_reply_off != 0) {
CERROR("early reply with offset %u\n", req->rq_reply_off);
- GOTO(err_free, rc = -EPROTO);
+ spin_unlock(&req->rq_lock);
+ GOTO(err_buf, rc = -EPROTO);
}
if (req->rq_nob_received != early_size) {
/* even another early arrived the size should be the same */
- CWARN("data size has changed from %u to %u\n",
- early_size, req->rq_nob_received);
+ CERROR("data size has changed from %u to %u\n",
+ early_size, req->rq_nob_received);
spin_unlock(&req->rq_lock);
- GOTO(err_free, rc = -EINVAL);
+ GOTO(err_buf, rc = -EINVAL);
}
if (req->rq_nob_received < sizeof(struct lustre_msg)) {
CERROR("early reply length %d too small\n",
req->rq_nob_received);
spin_unlock(&req->rq_lock);
- GOTO(err_free, rc = -EALREADY);
+ GOTO(err_buf, rc = -EALREADY);
}
memcpy(early_buf, req->rq_repbuf, early_size);
spin_unlock(&req->rq_lock);
- req->rq_repdata = early_buf;
- req->rq_repdata_len = early_size;
-
- rc = do_cli_unwrap_reply(req);
-
- /* treate resend as an error case. in fact server should never ask
- * resend via early reply. */
- if (req->rq_resend) {
- req->rq_resend = 0;
- rc = -EPROTO;
- }
+ early_req->rq_cli_ctx = sptlrpc_cli_ctx_get(req->rq_cli_ctx);
+ early_req->rq_flvr = req->rq_flvr;
+ early_req->rq_repbuf = early_buf;
+ early_req->rq_repbuf_len = early_bufsz;
+ early_req->rq_repdata = (struct lustre_msg *) early_buf;
+ early_req->rq_repdata_len = early_size;
+ early_req->rq_early = 1;
+ rc = do_cli_unwrap_reply(early_req);
if (rc) {
- LASSERT(req->rq_repmsg == NULL);
- req->rq_repdata = NULL;
- req->rq_repdata_len = 0;
- GOTO(err_free, rc);
+ DEBUG_REQ(D_ADAPTTO, early_req,
+ "error %d unwrap early reply", rc);
+ GOTO(err_ctx, rc);
}
- LASSERT(req->rq_repmsg);
+ LASSERT(early_req->rq_repmsg);
+ *req_ret = early_req;
RETURN(0);
-err_free:
+err_ctx:
+ sptlrpc_cli_ctx_put(early_req->rq_cli_ctx, 1);
+err_buf:
OBD_FREE(early_buf, early_bufsz);
+err_req:
+ OBD_FREE_PTR(early_req);
RETURN(rc);
}
-int sptlrpc_cli_finish_early_reply(struct ptlrpc_request *req)
+void sptlrpc_cli_finish_early_reply(struct ptlrpc_request *early_req)
{
- int early_bufsz;
-
- LASSERT(req->rq_repdata);
- LASSERT(req->rq_repdata_len);
- LASSERT(req->rq_repmsg);
+ LASSERT(early_req->rq_repbuf);
+ LASSERT(early_req->rq_repdata);
+ LASSERT(early_req->rq_repmsg);
- early_bufsz = size_roundup_power2(req->rq_repdata_len);
- OBD_FREE(req->rq_repdata, early_bufsz);
-
- req->rq_repdata = NULL;
- req->rq_repdata_len = 0;
- req->rq_repmsg = NULL;
- return 0;
+ sptlrpc_cli_ctx_put(early_req->rq_cli_ctx, 1);
+ OBD_FREE(early_req->rq_repbuf, early_req->rq_repbuf_len);
+ OBD_FREE_PTR(early_req);
}
/**************************************************
LASSERT(req->rq_repdata);
- /* real reply rq_repdata point inside of rq_reqbuf; early reply
- * rq_repdata point to a separate allocated space */
- if ((char *) req->rq_repdata < req->rq_repbuf ||
- (char *) req->rq_repdata >= req->rq_repbuf + req->rq_repbuf_len) {
+ if (req->rq_early) {
cksums = req->rq_repdata->lm_cksum;
req->rq_repdata->lm_cksum = 0;
int plain_ctx_verify(struct ptlrpc_cli_ctx *ctx, struct ptlrpc_request *req)
{
struct lustre_msg *msg = req->rq_repdata;
- int early = 0;
__u32 cksum;
ENTRY;
RETURN(-EPROTO);
}
- /* find out if it's an early reply */
- if ((char *) msg < req->rq_repbuf ||
- (char *) msg >= req->rq_repbuf + req->rq_repbuf_len)
- early = 1;
-
/* expect no user desc in reply */
if (PLAIN_WFLVR_HAS_USER(msg->lm_secflvr)) {
CERROR("Unexpected udesc flag in reply\n");
RETURN(-EPROTO);
}
- if (unlikely(early)) {
+ if (unlikely(req->rq_early)) {
cksum = crc32_le(!(__u32) 0,
lustre_msg_buf(msg, PLAIN_PACK_MSG_OFF, 0),
lustre_msg_buflen(msg, PLAIN_PACK_MSG_OFF));
} else {
/* whether we sent with bulk or not, we expect the same
* in reply, except for early reply */
- if (!early &&
+ if (!req->rq_early &&
!equi(req->rq_pack_bulk == 1,
PLAIN_WFLVR_HAS_BULK(msg->lm_secflvr))) {
CERROR("%s bulk checksum in reply\n",