Whamcloud - gitweb
- land b_hd_ver_recov
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index 99f79a4..37dcbde 100644 (file)
@@ -354,11 +354,41 @@ ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
         EXIT;
 }
 
-void
-ptlrpc_commit_replies (struct obd_device *obd)
+void ptlrpc_commit_replies_alt(struct obd_export *exp)
 {
-        struct list_head   *tmp;
-        struct list_head   *nxt;
+        struct ptlrpc_reply_state *rs, *nxt;
+        struct list_head committed_list;
+        DECLARE_RS_BATCH(batch);
+        ENTRY;
+
+        CFS_INIT_LIST_HEAD(&committed_list);
+        spin_lock(&exp->exp_uncommitted_replies_lock);
+        list_for_each_entry_safe(rs, nxt, &exp->exp_uncommitted_replies,
+                                 rs_obd_list) {
+                LASSERT (rs->rs_difficult);
+                LASSERT(rs->rs_export);
+                if (likely(rs->rs_transno <= exp->exp_last_committed))
+                        list_move(&rs->rs_obd_list, &committed_list);
+                else
+                        break;
+        }
+        spin_unlock(&exp->exp_uncommitted_replies_lock);
+
+        /* XXX: do we need this in context of commit callback? maybe separate thread
+         * should work this out */
+        rs_batch_init(&batch);
+        /* get replies that have been committed and get their service
+         * to attend to complete them. */
+        list_for_each_entry_safe(rs, nxt, &committed_list, rs_obd_list) {
+                list_del_init(&rs->rs_obd_list);
+                rs_batch_add(&batch, rs);
+        }
+        rs_batch_fini(&batch);
+        EXIT;
+}
+void ptlrpc_commit_replies(struct obd_export *exp)
+{
+        struct ptlrpc_reply_state *rs, *nxt;
         DECLARE_RS_BATCH(batch);
         ENTRY;
 
@@ -367,19 +397,18 @@ ptlrpc_commit_replies (struct obd_device *obd)
          * to attend to complete them. */
 
         /* CAVEAT EMPTOR: spinlock ordering!!! */
-        spin_lock(&obd->obd_uncommitted_replies_lock);
-        list_for_each_safe (tmp, nxt, &obd->obd_uncommitted_replies) {
-                struct ptlrpc_reply_state *rs =
-                        list_entry(tmp, struct ptlrpc_reply_state, rs_obd_list);
-
+        spin_lock(&exp->exp_uncommitted_replies_lock);
+        list_for_each_entry_safe(rs, nxt, &exp->exp_uncommitted_replies,
+                                 rs_obd_list) {
                 LASSERT (rs->rs_difficult);
-
-                if (rs->rs_transno <= obd->obd_last_committed) {
+                /* VBR: per-export last_committed */
+                LASSERT(rs->rs_export);
+                if (rs->rs_transno <= exp->exp_last_committed) {
                         list_del_init(&rs->rs_obd_list);
                         rs_batch_add(&batch, rs);
                 }
         }
-        spin_unlock(&obd->obd_uncommitted_replies_lock);
+        spin_unlock(&exp->exp_uncommitted_replies_lock);
         rs_batch_fini(&batch);
         EXIT;
 }
@@ -532,14 +561,14 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         array->paa_count = 0;
         array->paa_deadline = -1;
 
-        /* allocate memory for srv_at_array (ptlrpc_at_array) */ 
+        /* allocate memory for srv_at_array (ptlrpc_at_array) */
         OBD_ALLOC(array->paa_reqs_array, sizeof(struct list_head) * size);
         if (array->paa_reqs_array == NULL)
                 GOTO(failed, NULL);
 
         for (index = 0; index < size; index++)
                 CFS_INIT_LIST_HEAD(&array->paa_reqs_array[index]);
-        
+
         OBD_ALLOC(array->paa_reqs_count, sizeof(__u32) * size);
         if (array->paa_reqs_count == NULL)
                 GOTO(failed, NULL);
@@ -706,8 +735,8 @@ static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
         if (req->rq_at_linked) {
                 struct ptlrpc_at_array *array = &svc->srv_at_array;
                 __u32 index = req->rq_at_index;
-        
-                req->rq_at_linked = 0;        
+
+                req->rq_at_linked = 0;
                 array->paa_reqs_count[index]--;
                 array->paa_count--;
         }
@@ -1096,7 +1125,7 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
                                 rq->rq_at_linked = 0;
                                 continue;
                         }
-                        
+
                         /* update the earliest deadline */
                         if (deadline == -1 || rq->rq_deadline < deadline)
                                 deadline = rq->rq_deadline;
@@ -1674,12 +1703,12 @@ ptlrpc_handle_rs (struct ptlrpc_reply_state *rs)
         list_del_init (&rs->rs_exp_list);
         spin_unlock (&exp->exp_lock);
 
-        /* Avoid obd_uncommitted_replies_lock contention if we 100% sure that
+        /* Avoid exp_uncommitted_replies_lock contention if we 100% sure that
          * rs has been removed from the list already */
         if (!list_empty_careful(&rs->rs_obd_list)) {
-                spin_lock(&obd->obd_uncommitted_replies_lock);
+                spin_lock(&exp->exp_uncommitted_replies_lock);
                 list_del_init(&rs->rs_obd_list);
-                spin_unlock(&obd->obd_uncommitted_replies_lock);
+                spin_unlock(&exp->exp_uncommitted_replies_lock);
         }
 
         spin_lock(&rs->rs_lock);
@@ -2482,17 +2511,17 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
         cfs_timer_disarm(&service->srv_at_timer);
 
         if (array->paa_reqs_array != NULL) {
-                OBD_FREE(array->paa_reqs_array, 
+                OBD_FREE(array->paa_reqs_array,
                          sizeof(struct list_head) * array->paa_size);
                 array->paa_reqs_array = NULL;
         }
-        
+
         if (array->paa_reqs_count != NULL) {
-                OBD_FREE(array->paa_reqs_count, 
+                OBD_FREE(array->paa_reqs_count,
                          sizeof(__u32) * array->paa_size);
                 array->paa_reqs_count= NULL;
         }
-       
+
         OBD_FREE_PTR(service);
         RETURN(0);
 }