EXIT;
}
-void
-ptlrpc_commit_replies (struct obd_device *obd)
+void ptlrpc_commit_replies_alt(struct obd_export *exp)
{
- struct list_head *tmp;
- struct list_head *nxt;
+ struct ptlrpc_reply_state *rs, *nxt;
+ struct list_head committed_list;
+ DECLARE_RS_BATCH(batch);
+ ENTRY;
+
+ CFS_INIT_LIST_HEAD(&committed_list);
+ spin_lock(&exp->exp_uncommitted_replies_lock);
+ list_for_each_entry_safe(rs, nxt, &exp->exp_uncommitted_replies,
+ rs_obd_list) {
+ LASSERT (rs->rs_difficult);
+ LASSERT(rs->rs_export);
+ if (likely(rs->rs_transno <= exp->exp_last_committed))
+ list_move(&rs->rs_obd_list, &committed_list);
+ else
+ break;
+ }
+ spin_unlock(&exp->exp_uncommitted_replies_lock);
+
+ /* XXX: do we need this in context of commit callback? maybe separate thread
+ * should work this out */
+ rs_batch_init(&batch);
+ /* get replies that have been committed and get their service
+ * to attend to complete them. */
+ list_for_each_entry_safe(rs, nxt, &committed_list, rs_obd_list) {
+ list_del_init(&rs->rs_obd_list);
+ rs_batch_add(&batch, rs);
+ }
+ rs_batch_fini(&batch);
+ EXIT;
+}
+void ptlrpc_commit_replies(struct obd_export *exp)
+{
+ struct ptlrpc_reply_state *rs, *nxt;
DECLARE_RS_BATCH(batch);
ENTRY;
* to attend to complete them. */
/* CAVEAT EMPTOR: spinlock ordering!!! */
- spin_lock(&obd->obd_uncommitted_replies_lock);
- list_for_each_safe (tmp, nxt, &obd->obd_uncommitted_replies) {
- struct ptlrpc_reply_state *rs =
- list_entry(tmp, struct ptlrpc_reply_state, rs_obd_list);
-
+ spin_lock(&exp->exp_uncommitted_replies_lock);
+ list_for_each_entry_safe(rs, nxt, &exp->exp_uncommitted_replies,
+ rs_obd_list) {
LASSERT (rs->rs_difficult);
-
- if (rs->rs_transno <= obd->obd_last_committed) {
+ /* VBR: per-export last_committed */
+ LASSERT(rs->rs_export);
+ if (rs->rs_transno <= exp->exp_last_committed) {
list_del_init(&rs->rs_obd_list);
rs_batch_add(&batch, rs);
}
}
- spin_unlock(&obd->obd_uncommitted_replies_lock);
+ spin_unlock(&exp->exp_uncommitted_replies_lock);
rs_batch_fini(&batch);
EXIT;
}
array->paa_count = 0;
array->paa_deadline = -1;
- /* allocate memory for srv_at_array (ptlrpc_at_array) */
+ /* allocate memory for srv_at_array (ptlrpc_at_array) */
OBD_ALLOC(array->paa_reqs_array, sizeof(struct list_head) * size);
if (array->paa_reqs_array == NULL)
GOTO(failed, NULL);
for (index = 0; index < size; index++)
CFS_INIT_LIST_HEAD(&array->paa_reqs_array[index]);
-
+
OBD_ALLOC(array->paa_reqs_count, sizeof(__u32) * size);
if (array->paa_reqs_count == NULL)
GOTO(failed, NULL);
if (req->rq_at_linked) {
struct ptlrpc_at_array *array = &svc->srv_at_array;
__u32 index = req->rq_at_index;
-
- req->rq_at_linked = 0;
+
+ req->rq_at_linked = 0;
array->paa_reqs_count[index]--;
array->paa_count--;
}
rq->rq_at_linked = 0;
continue;
}
-
+
/* update the earliest deadline */
if (deadline == -1 || rq->rq_deadline < deadline)
deadline = rq->rq_deadline;
list_del_init (&rs->rs_exp_list);
spin_unlock (&exp->exp_lock);
- /* Avoid obd_uncommitted_replies_lock contention if we 100% sure that
+ /* Avoid exp_uncommitted_replies_lock contention if we 100% sure that
* rs has been removed from the list already */
if (!list_empty_careful(&rs->rs_obd_list)) {
- spin_lock(&obd->obd_uncommitted_replies_lock);
+ spin_lock(&exp->exp_uncommitted_replies_lock);
list_del_init(&rs->rs_obd_list);
- spin_unlock(&obd->obd_uncommitted_replies_lock);
+ spin_unlock(&exp->exp_uncommitted_replies_lock);
}
spin_lock(&rs->rs_lock);
cfs_timer_disarm(&service->srv_at_timer);
if (array->paa_reqs_array != NULL) {
- OBD_FREE(array->paa_reqs_array,
+ OBD_FREE(array->paa_reqs_array,
sizeof(struct list_head) * array->paa_size);
array->paa_reqs_array = NULL;
}
-
+
if (array->paa_reqs_count != NULL) {
- OBD_FREE(array->paa_reqs_count,
+ OBD_FREE(array->paa_reqs_count,
sizeof(__u32) * array->paa_size);
array->paa_reqs_count= NULL;
}
-
+
OBD_FREE_PTR(service);
RETURN(0);
}