so that servers' early reply updates to the deadline aren't
kept in per-cpu cache */
time_t rq_reply_deadline; /* when req reply unlink must finish. */
+ time_t rq_bulk_deadline; /* when req bulk unlink must finish. */
int rq_timeout; /* service time estimate (secs) */
/* Multi-rpc bits */
int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc);
void ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc);
int ptlrpc_register_bulk(struct ptlrpc_request *req);
-void ptlrpc_unregister_bulk (struct ptlrpc_request *req);
+int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async);
-static inline int ptlrpc_bulk_active (struct ptlrpc_bulk_desc *desc)
+static inline int ptlrpc_server_bulk_active(struct ptlrpc_bulk_desc *desc)
{
- int rc;
+ int rc;
+
+ LASSERT(desc != NULL);
+
+ spin_lock(&desc->bd_lock);
+ rc = desc->bd_network_rw;
+ spin_unlock(&desc->bd_lock);
+ return rc;
+}
+
+static inline int ptlrpc_client_bulk_active(struct ptlrpc_request *req)
+{
+ struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+ int rc;
+
+ LASSERT(req != NULL);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK) &&
+ req->rq_bulk_deadline > cfs_time_current_sec())
+ return 1;
+
+ if (!desc)
+ return 0;
spin_lock(&desc->bd_lock);
rc = desc->bd_network_rw;
spin_unlock(&desc->bd_lock);
- return (rc);
+ return rc;
}
#define PTLRPC_REPLY_MAYBE_DIFFICULT 0x01
static inline int
ptlrpc_client_early(struct ptlrpc_request *req)
{
- if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
req->rq_reply_deadline > cfs_time_current_sec())
return 0;
return req->rq_early;
static inline int
ptlrpc_client_replied(struct ptlrpc_request *req)
{
- if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
req->rq_reply_deadline > cfs_time_current_sec())
return 0;
return req->rq_replied;
static inline int
ptlrpc_client_recv(struct ptlrpc_request *req)
{
- if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
req->rq_reply_deadline > cfs_time_current_sec())
return 1;
return req->rq_receiving_reply;
int rc;
spin_lock(&req->rq_lock);
- if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
req->rq_reply_deadline > cfs_time_current_sec()) {
spin_unlock(&req->rq_lock);
return 1;
#define OBD_FAIL_PTLRPC_PAUSE_REP 0x50c
#define OBD_FAIL_PTLRPC_DUMP_LOG 0x50e
-#define OBD_FAIL_PTLRPC_LONG_UNLINK 0x50f
-#define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT 0x510
-#define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT 0x511
+#define OBD_FAIL_PTLRPC_LONG_REPL_UNLINK 0x50f
+#define OBD_FAIL_PTLRPC_LONG_BULK_UNLINK 0x510
+#define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT 0x511
+#define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT 0x512
#define OBD_FAIL_OBD_PING_NET 0x600
#define OBD_FAIL_OBD_LOG_CANCEL_NET 0x601
if (timeout < 0)
CERROR("Req deadline already passed %lu (now: %lu)\n",
req->rq_deadline, cfs_time_current_sec());
- *lwi = LWI_TIMEOUT(max(timeout, 1) * HZ, NULL, NULL);
- rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), lwi);
+ *lwi = LWI_TIMEOUT(cfs_time_seconds(max(timeout, 1)), NULL, NULL);
+ rc = l_wait_event(desc->bd_waitq, !ptlrpc_server_bulk_active(desc), lwi);
LASSERT (rc == 0 || rc == -ETIMEDOUT);
if (rc == 0) {
ost_bulk_timeout,
desc);
rc = l_wait_event(desc->bd_waitq,
- !ptlrpc_bulk_active(desc) ||
+ !ptlrpc_server_bulk_active(desc) ||
exp->exp_failed, &lwi);
LASSERT(rc == 0 || rc == -ETIMEDOUT);
/* Wait again if we changed deadline */
if (desc->bd_export->exp_failed)
rc = -ENOTCONN;
else
- rc = ptlrpc_start_bulk_transfer (desc);
+ rc = ptlrpc_start_bulk_transfer(desc);
if (rc == 0) {
time_t start = cfs_time_current_sec();
do {
lwi = LWI_TIMEOUT_INTERVAL(timeout, cfs_time_seconds(1),
ost_bulk_timeout, desc);
rc = l_wait_event(desc->bd_waitq,
- !ptlrpc_bulk_active(desc) ||
+ !ptlrpc_server_bulk_active(desc) ||
desc->bd_export->exp_failed, &lwi);
LASSERT(rc == 0 || rc == -ETIMEDOUT);
/* Wait again if we changed deadline */
return desc;
}
-struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
- int npages, int type, int portal)
+struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
+ int npages, int type, int portal)
{
struct obd_import *imp = req->rq_import;
struct ptlrpc_bulk_desc *desc;
* sure that all rdma transfers finished and will
* not corrupt any data.
*/
- if (ptlrpc_client_recv_or_unlink(req))
+ if (ptlrpc_client_recv_or_unlink(req) ||
+ ptlrpc_client_bulk_active(req))
continue;
/*
* Turn fail_loc off to prevent it from looping
* forever.
*/
- OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_UNLINK,
- OBD_FAIL_ONCE);
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
+ OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK,
+ OBD_FAIL_ONCE);
+ }
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) {
+ OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK,
+ OBD_FAIL_ONCE);
+ }
/*
* Move to next phase if reply was successfully
/*
* Check if we still need to wait for unlink.
*/
- if (ptlrpc_client_recv_or_unlink(req))
+ if (ptlrpc_client_recv_or_unlink(req) ||
+ ptlrpc_client_bulk_active(req))
continue;
}
lustre_msg_add_flags(req->rq_reqmsg,
MSG_RESENT);
if (req->rq_bulk) {
- __u64 old_xid = req->rq_xid;
+ __u64 old_xid;
- ptlrpc_unregister_bulk(req);
+ if (!ptlrpc_unregister_bulk(req, 1))
+ continue;
/* ensure previous bulk fails */
+ old_xid = req->rq_xid;
req->rq_xid = ptlrpc_next_xid();
CDEBUG(D_HA, "resend bulk "
"old x"LPU64
}
LASSERT(req->rq_phase == RQ_PHASE_BULK);
- if (ptlrpc_bulk_active(req->rq_bulk))
+ if (ptlrpc_client_bulk_active(req))
continue;
if (!req->rq_bulk->bd_success) {
if (!ptlrpc_unregister_reply(req, 1))
continue;
- if (req->rq_bulk != NULL)
- ptlrpc_unregister_bulk(req);
+ if (!ptlrpc_unregister_bulk(req, 1))
+ continue;
/* When calling interpret receiving already should be
* finished. */
spin_unlock(&req->rq_lock);
ptlrpc_unregister_reply(req, async_unlink);
+ ptlrpc_unregister_bulk(req, async_unlink);
if (obd_dump_on_timeout)
libcfs_debug_dumplog();
- if (req->rq_bulk != NULL)
- ptlrpc_unregister_bulk (req);
-
if (imp == NULL) {
DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
RETURN(1);
/*
* Let's setup deadline for reply unlink.
*/
- if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
async && request->rq_reply_deadline == 0)
request->rq_reply_deadline = cfs_time_current_sec()+LONG_UNLINK;
for (;;) {
/* Network access will complete in finite time but the HUGE
* timeout lets us CWARN for visibility of sluggish NALs */
- lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL);
+ lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
+ cfs_time_seconds(1), NULL, NULL);
rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
&lwi);
if (rc == 0) {
lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
if (req->rq_bulk != NULL) {
- ptlrpc_unregister_bulk (req);
+ ptlrpc_unregister_bulk(req, 0);
/* bulk requests are supposed to be
* idempotent, so we are free to bump the xid
* me. */
lwi = LWI_TIMEOUT(timeout, NULL, NULL);
brc = l_wait_event(req->rq_reply_waitq,
- !ptlrpc_bulk_active(req->rq_bulk),
+ !ptlrpc_client_bulk_active(req),
&lwi);
LASSERT(brc == 0 || brc == -ETIMEDOUT);
if (brc != 0) {
}
}
if (rc < 0)
- ptlrpc_unregister_bulk (req);
+ ptlrpc_unregister_bulk(req, 0);
}
LASSERT(!req->rq_receiving_reply);
RETURN (0);
}
-int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
+int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
{
struct ptlrpc_connection *conn = desc->bd_export->exp_connection;
int rc;
RETURN(0);
}
-void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc)
+/* Server side bulk abort. Idempotent. Not thread-safe (i.e. only
+ * serialises with completion callback) */
+void ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc)
{
- /* Server side bulk abort. Idempotent. Not thread-safe (i.e. only
- * serialises with completion callback) */
- struct l_wait_info lwi;
- int rc;
+ struct l_wait_info lwi;
+ int rc;
- LASSERT (!in_interrupt ()); /* might sleep */
+ LASSERT(!in_interrupt()); /* might sleep */
- if (!ptlrpc_bulk_active(desc)) /* completed or */
+ if (!ptlrpc_server_bulk_active(desc)) /* completed or */
return; /* never started */
/* Do not send any meaningful data over the wire for evicted clients */
* but we must still l_wait_event() in this case, to give liblustre
* a chance to run server_bulk_callback()*/
- LNetMDUnlink (desc->bd_md_h);
+ LNetMDUnlink(desc->bd_md_h);
for (;;) {
/* Network access will complete in finite time but the HUGE
* timeout lets us CWARN for visibility of sluggish NALs */
- lwi = LWI_TIMEOUT (cfs_time_seconds(300), NULL, NULL);
+ lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
+ cfs_time_seconds(1), NULL, NULL);
rc = l_wait_event(desc->bd_waitq,
- !ptlrpc_bulk_active(desc), &lwi);
+ !ptlrpc_server_bulk_active(desc), &lwi);
if (rc == 0)
return;
}
}
-int ptlrpc_register_bulk (struct ptlrpc_request *req)
+int ptlrpc_register_bulk(struct ptlrpc_request *req)
{
struct ptlrpc_bulk_desc *desc = req->rq_bulk;
lnet_process_id_t peer;
RETURN(0);
}
-void ptlrpc_unregister_bulk (struct ptlrpc_request *req)
+/* Disconnect a bulk desc from the network. Idempotent. Not
+ * thread-safe (i.e. only interlocks with completion callback). */
+int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async)
{
- /* Disconnect a bulk desc from the network. Idempotent. Not
- * thread-safe (i.e. only interlocks with completion callback). */
struct ptlrpc_bulk_desc *desc = req->rq_bulk;
cfs_waitq_t *wq;
struct l_wait_info lwi;
int rc;
+ ENTRY;
+
+ LASSERT(!in_interrupt()); /* might sleep */
- LASSERT (!in_interrupt ()); /* might sleep */
+ /* Let's setup deadline for reply unlink. */
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK) &&
+ async && req->rq_bulk_deadline == 0)
+ req->rq_bulk_deadline = cfs_time_current_sec() + LONG_UNLINK;
- if (!ptlrpc_bulk_active(desc)) /* completed or */
- return; /* never registered */
+ if (!ptlrpc_client_bulk_active(req)) /* completed or */
+ RETURN(1); /* never registered */
- LASSERT (desc->bd_req == req); /* bd_req NULL until registered */
+ LASSERT(desc->bd_req == req); /* bd_req NULL until registered */
/* the unlink ensures the callback happens ASAP and is the last
* one. If it fails, it must be because completion just happened,
* but we must still l_wait_event() in this case to give liblustre
* a chance to run client_bulk_callback() */
- LNetMDUnlink (desc->bd_md_h);
+ LNetMDUnlink(desc->bd_md_h);
+
+ if (!ptlrpc_client_bulk_active(req)) /* completed or */
+ RETURN(1); /* never registered */
+
+ /* Move to "Unregistering" phase as bulk was not unlinked yet. */
+ ptlrpc_rqphase_move(req, RQ_PHASE_UNREGISTERING);
+
+ /* Do not wait for unlink to finish. */
+ if (async)
+ RETURN(0);
if (req->rq_set != NULL)
wq = &req->rq_set->set_waitq;
for (;;) {
/* Network access will complete in finite time but the HUGE
* timeout lets us CWARN for visibility of sluggish NALs */
- lwi = LWI_TIMEOUT (cfs_time_seconds(300), NULL, NULL);
- rc = l_wait_event(*wq, !ptlrpc_bulk_active(desc), &lwi);
- if (rc == 0)
- return;
+ lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
+ cfs_time_seconds(1), NULL, NULL);
+ rc = l_wait_event(*wq, !ptlrpc_client_bulk_active(req), &lwi);
+ if (rc == 0) {
+ ptlrpc_rqphase_move(req, req->rq_next_phase);
+ RETURN(1);
+ }
- LASSERT (rc == -ETIMEDOUT);
- DEBUG_REQ(D_WARNING,req,"Unexpectedly long timeout: desc %p",
+ LASSERT(rc == -ETIMEDOUT);
+ DEBUG_REQ(D_WARNING, req, "Unexpectedly long timeout: desc %p",
desc);
}
+ RETURN(0);
}
static void ptlrpc_at_set_reply(struct ptlrpc_request *req, int flags)
}
}
-int ptlrpc_send_reply (struct ptlrpc_request *req, int flags)
+int ptlrpc_send_reply(struct ptlrpc_request *req, int flags)
{
struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
struct ptlrpc_reply_state *rs = req->rq_reply_state;
LASSERT(!request->rq_receiving_reply);
cleanup_bulk:
- if (request->rq_bulk != NULL)
- ptlrpc_unregister_bulk(request);
-
+ /* We do sync unlink here as there was no real transfer here so
+ * the chance to have long unlink to sluggish net is smaller here. */
+ ptlrpc_unregister_bulk(request, 0);
return rc;
}
-int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
+int ptlrpc_register_rqbd(struct ptlrpc_request_buffer_desc *rqbd)
{
struct ptlrpc_service *service = rqbd->rqbd_service;
static lnet_process_id_t match_id = {LNET_NID_ANY, LNET_PID_ANY};
/* Network access will complete in finite time but the HUGE
* timeout lets us CWARN for visibility of sluggish NALs */
- lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL);
+ lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
+ cfs_time_seconds(1), NULL, NULL);
rc = l_wait_event(service->srv_waitq,
service->srv_nrqbd_receiving == 0,
&lwi);
done
if [ $OPER == "timeout" ] ; then
for j in `seq $OSTCOUNT`; do
- #define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT 0x510
- do_facet ost$j lctl set_param fail_loc=0x510
+ #define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT 0x511
+ do_facet ost$j lctl set_param fail_loc=0x511
done
echo lock should expire
else
for j in `seq $OSTCOUNT`; do
- #define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT 0x511
- do_facet ost$j lctl set_param fail_loc=0x511
+ #define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT 0x512
+ do_facet ost$j lctl set_param fail_loc=0x512
done
echo lock should not expire
fi