}
if (imp->imp_replayable) {
+ /* if other threads are waiting for ptlrpc_free_committed()
+ * they could continue the work of freeing RPCs. That reduces
+ * lock hold times, and distributes work more fairly across
+ * waiting threads. We can't use spin_is_contended() since
+ * there are many other places where imp_lock is held.
+ */
+ atomic_inc(&imp->imp_waiting);
spin_lock(&imp->imp_lock);
+ atomic_dec(&imp->imp_waiting);
/*
* No point in adding already-committed requests to the replay
* list, we will just remove them immediately. b=9829
*/
spin_unlock(&imp->imp_lock);
req->rq_commit_cb(req);
+ atomic_inc(&imp->imp_waiting);
spin_lock(&imp->imp_lock);
+ atomic_dec(&imp->imp_waiting);
}
/*
* Iterates through replay_list on import and prunes
* all requests have transno smaller than last_committed for the
* import and don't have rq_replay set.
- * Since requests are sorted in transno order, stops when meetign first
+ * Since requests are sorted in transno order, stops when meeting first
* transno bigger than last_committed.
* caller must hold imp->imp_lock
*/
struct ptlrpc_request *req, *saved;
struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
bool skip_committed_list = true;
+ unsigned int replay_scanned = 0, replay_freed = 0;
+ unsigned int commit_scanned = 0, commit_freed = 0;
+ unsigned int debug_level = D_INFO;
+ __u64 peer_committed_transno;
+ int imp_generation;
+ time64_t start, now;
ENTRY;
LASSERT(imp != NULL);
assert_spin_locked(&imp->imp_lock);
- if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked &&
- imp->imp_generation == imp->imp_last_generation_checked) {
+ start = ktime_get_seconds();
+ /* save these here, we can potentially drop imp_lock after checking */
+ peer_committed_transno = imp->imp_peer_committed_transno;
+ imp_generation = imp->imp_generation;
+
+ if (peer_committed_transno == imp->imp_last_transno_checked &&
+ imp_generation == imp->imp_last_generation_checked) {
CDEBUG(D_INFO, "%s: skip recheck: last_committed %llu\n",
- imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
+ imp->imp_obd->obd_name, peer_committed_transno);
RETURN_EXIT;
}
CDEBUG(D_RPCTRACE, "%s: committing for last_committed %llu gen %d\n",
- imp->imp_obd->obd_name, imp->imp_peer_committed_transno,
- imp->imp_generation);
+ imp->imp_obd->obd_name, peer_committed_transno, imp_generation);
- if (imp->imp_generation != imp->imp_last_generation_checked ||
+ if (imp_generation != imp->imp_last_generation_checked ||
imp->imp_last_transno_checked == 0)
skip_committed_list = false;
-
- imp->imp_last_transno_checked = imp->imp_peer_committed_transno;
- imp->imp_last_generation_checked = imp->imp_generation;
+ /* maybe drop imp_lock here, if another lock protected the lists */
list_for_each_entry_safe(req, saved, &imp->imp_replay_list,
rq_replay_list) {
DEBUG_REQ(D_EMERG, req, "zero transno during replay");
LBUG();
}
- if (req->rq_import_generation < imp->imp_generation) {
+
+ /* If other threads are waiting on imp_lock, stop processing
+ * in this thread. Another thread can finish remaining work.
+ * This may happen if there are huge numbers of open files
+ * that are closed suddenly or evicted, or if the server
+ * commit interval is very high vs. RPC rate.
+ */
+ if (++replay_scanned % 2048 == 0) {
+ now = ktime_get_seconds();
+ if (now > start + 5)
+ debug_level = D_WARNING;
+
+ if ((replay_freed > 128 && now > start + 3) &&
+ atomic_read(&imp->imp_waiting)) {
+ if (debug_level == D_INFO)
+ debug_level = D_RPCTRACE;
+ break;
+ }
+ }
+
+ if (req->rq_import_generation < imp_generation) {
DEBUG_REQ(D_RPCTRACE, req, "free request with old gen");
GOTO(free_req, 0);
}
/* not yet committed */
- if (req->rq_transno > imp->imp_peer_committed_transno) {
+ if (req->rq_transno > peer_committed_transno) {
DEBUG_REQ(D_RPCTRACE, req, "stopping search");
break;
}
}
DEBUG_REQ(D_INFO, req, "commit (last_committed %llu)",
- imp->imp_peer_committed_transno);
+ peer_committed_transno);
free_req:
+ replay_freed++;
ptlrpc_free_request(req);
}
list_for_each_entry_safe(req, saved, &imp->imp_committed_list,
rq_replay_list) {
LASSERT(req->rq_transno != 0);
- if (req->rq_import_generation < imp->imp_generation ||
+
+ /* If other threads are waiting on imp_lock, stop processing
+ * in this thread. Another thread can finish remaining work. */
+ if (++commit_scanned % 2048 == 0) {
+ now = ktime_get_seconds();
+ if (now > start + 6)
+ debug_level = D_WARNING;
+
+ if ((commit_freed > 128 && now > start + 4) &&
+ atomic_read(&imp->imp_waiting)) {
+ if (debug_level == D_INFO)
+ debug_level = D_RPCTRACE;
+ break;
+ }
+ }
+
+ if (req->rq_import_generation < imp_generation ||
!req->rq_replay) {
DEBUG_REQ(D_RPCTRACE, req, "free %s open request",
req->rq_import_generation <
- imp->imp_generation ? "stale" : "closed");
+ imp_generation ? "stale" : "closed");
if (imp->imp_replay_cursor == &req->rq_replay_list)
imp->imp_replay_cursor =
req->rq_replay_list.next;
+ commit_freed++;
ptlrpc_free_request(req);
}
}
out:
+ /* if full lists processed without interruption, avoid next scan */
+ if (debug_level == D_INFO) {
+ imp->imp_last_transno_checked = peer_committed_transno;
+ imp->imp_last_generation_checked = imp_generation;
+ }
+
+ CDEBUG_LIMIT(debug_level,
+ "%s: %s: skip=%u replay=%u/%u committed=%u/%u\n",
+ imp->imp_obd->obd_name,
+ debug_level == D_INFO ? "normal" : "overloaded",
+ skip_committed_list, replay_freed, replay_scanned,
+ commit_freed, commit_scanned);
EXIT;
}