/** A spinlock to protect the reply state flags */
spinlock_t rs_lock;
/** Reply state flags */
- unsigned long rs_difficult:1; /* ACK/commit stuff */
- unsigned long rs_no_ack:1; /* no ACK, even for
+ unsigned long rs_difficult:1; /* ACK/commit stuff */
+ unsigned long rs_no_ack:1; /* no ACK, even for
difficult requests */
- unsigned long rs_scheduled:1; /* being handled? */
- unsigned long rs_scheduled_ever:1;/* any schedule attempts? */
- unsigned long rs_handled:1; /* been handled yet? */
- unsigned long rs_on_net:1; /* reply_out_callback pending? */
- unsigned long rs_prealloc:1; /* rs from prealloc list */
- unsigned long rs_committed:1;/* the transaction was committed
+ unsigned long rs_scheduled:1; /* being handled? */
+ unsigned long rs_scheduled_ever:1;/* any schedule attempts? */
+ unsigned long rs_handled:1; /* been handled yet? */
+ unsigned long rs_sent:1; /* Got LNET_EVENT_SEND? */
+ unsigned long rs_unlinked:1; /* Reply MD unlinked? */
+ unsigned long rs_prealloc:1; /* rs from prealloc list */
+ unsigned long rs_committed:1;/* the transaction was committed
and the rs was dispatched
by ptlrpc_commit_replies */
unsigned long rs_convert_lock:1; /* need to convert saved
/** Number of locks awaiting client ACK */
int rs_nlocks;
- /** Size of the state */
- int rs_size;
- /** opcode */
- __u32 rs_opc;
- /** Transaction number */
- __u64 rs_transno;
- /** xid */
- __u64 rs_xid;
- struct obd_export *rs_export;
+ /** Size of the state */
+ int rs_size;
+ /** opcode */
+ __u32 rs_opc;
+ /** Transaction number */
+ __u64 rs_transno;
+ /** xid */
+ __u64 rs_xid;
+ struct obd_export *rs_export;
struct ptlrpc_service_part *rs_svcpt;
/** Lnet metadata handle for the reply */
struct lnet_handle_md rs_md_h;
LASSERT(!rs->rs_scheduled);
LASSERT(!rs->rs_scheduled_ever);
LASSERT(!rs->rs_handled);
- LASSERT(!rs->rs_on_net);
+ LASSERT(!rs->rs_sent);
+ LASSERT(!rs->rs_unlinked);
LASSERT(rs->rs_export == NULL);
LASSERT(list_empty(&rs->rs_obd_list));
LASSERT(list_empty(&rs->rs_exp_list));
/* disable reply scheduling while I'm setting up */
rs->rs_scheduled = 1;
- rs->rs_on_net = 1;
+ rs->rs_sent = 0;
+ rs->rs_unlinked = 0;
rs->rs_xid = req->rq_xid;
rs->rs_transno = req->rq_transno;
rs->rs_export = exp;
* would have been +1 ref for the net, which
* reply_out_callback leaves alone)
*/
- rs->rs_on_net = 0;
+ rs->rs_sent = 1;
+ rs->rs_unlinked = 1;
ptlrpc_rs_addref(rs);
}
spin_lock(&rs->rs_lock);
if (rs->rs_transno <= exp->exp_last_committed ||
- (!rs->rs_on_net && !rs->rs_no_ack) ||
+ (rs->rs_unlinked && !rs->rs_no_ack) ||
list_empty(&rs->rs_exp_list) || /* completed already */
list_empty(&rs->rs_obd_list)) {
CDEBUG(D_HA, "Schedule reply immediately\n");
struct ptlrpc_cb_id *cbid = ev->md_user_ptr;
struct ptlrpc_reply_state *rs = cbid->cbid_arg;
struct ptlrpc_service_part *svcpt = rs->rs_svcpt;
- ENTRY;
-
- LASSERT (ev->type == LNET_EVENT_SEND ||
- ev->type == LNET_EVENT_ACK ||
- ev->type == LNET_EVENT_UNLINK);
-
- if (!rs->rs_difficult) {
- /* 'Easy' replies have no further processing so I drop the
- * net's ref on 'rs' */
- LASSERT (ev->unlinked);
- ptlrpc_rs_decref(rs);
- EXIT;
- return;
- }
+ bool need_schedule = false;
+
+ ENTRY;
+
+ LASSERT(ev->type == LNET_EVENT_SEND ||
+ ev->type == LNET_EVENT_ACK ||
+ ev->type == LNET_EVENT_UNLINK);
- LASSERT (rs->rs_on_net);
+ if (!rs->rs_difficult) {
+ /* 'Easy' replies have no further processing so I drop the
+ * net's ref on 'rs'
+ */
+ LASSERT(ev->unlinked);
+ ptlrpc_rs_decref(rs);
+ EXIT;
+ return;
+ }
- if (ev->unlinked) {
- /* Last network callback. The net's ref on 'rs' stays put
- * until ptlrpc_handle_rs() is done with it */
+ if (ev->type == LNET_EVENT_SEND) {
+ spin_lock(&rs->rs_lock);
+ rs->rs_sent = 1;
+ /* If transaction was committed before the SEND, and the ACK
+ * is lost, then we need to schedule so ptlrpc_hr can unlink
+ * the MD.
+ */
+ if (rs->rs_handled)
+ need_schedule = true;
+ spin_unlock(&rs->rs_lock);
+ }
+
+ if (ev->unlinked || need_schedule) {
+ LASSERT(rs->rs_sent);
+
+ /* Last network callback. The net's ref on 'rs' stays put
+ * until ptlrpc_handle_rs() is done with it
+ */
spin_lock(&svcpt->scp_rep_lock);
spin_lock(&rs->rs_lock);
- rs->rs_on_net = 0;
+ rs->rs_unlinked = ev->unlinked;
if (!rs->rs_no_ack ||
rs->rs_transno <=
rs->rs_export->exp_obd->obd_last_committed ||
LASSERT(atomic_read(&rs->rs_refcount) == 0);
LASSERT(!rs->rs_difficult || rs->rs_handled);
- LASSERT(!rs->rs_on_net);
+ LASSERT(!rs->rs_difficult || rs->rs_unlinked);
LASSERT(!rs->rs_scheduled);
LASSERT(rs->rs_export == NULL);
LASSERT(rs->rs_nlocks == 0);
libcfs_nid2str(exp->exp_connection->c_peer.nid));
}
- if ((!been_handled && rs->rs_on_net) || nlocks > 0) {
+ if ((rs->rs_sent && !rs->rs_unlinked) || nlocks > 0) {
spin_unlock(&rs->rs_lock);
- if (!been_handled && rs->rs_on_net) {
+ /* We can unlink if the LNET_EVENT_SEND has occurred.
+ * If rs_unlinked is set then MD is already unlinked and no
+ * need to do so here.
+ */
+ if ((rs->rs_sent && !rs->rs_unlinked)) {
LNetMDUnlink(rs->rs_md_h);
/* Ignore return code; we're racing with completion */
}
rs->rs_scheduled = 0;
rs->rs_convert_lock = 0;
- if (!rs->rs_on_net) {
+ if (rs->rs_unlinked) {
/* Off the net */
spin_unlock(&rs->rs_lock);