Whamcloud - gitweb
b=16509 port recovery ending tests to master
[fs/lustre-release.git] / lustre / ptlrpc / ptlrpcd.c
index e6bbf18..78c0546 100644 (file)
@@ -26,7 +26,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  */
 /*
  * lustre/ptlrpc/ptlrpcd.c
  */
 
+/** \defgroup ptlrpcd PortalRPC daemon
+ *
+ * ptlrpcd is a special thread with its own set where other user might add
+ * requests when they don't want to wait for their completion.
+ * PtlRPCD will take care of sending such requests and then processing their
+ * replies and calling completion callbacks as necessary.
+ * The callbacks are called directly from ptlrpcd context.
+ * It is important to never significantly block (esp. on RPCs!) within such
+ * completion handler or a deadlock might occur where ptlrpcd enters some
+ * callback that attempts to send another RPC and wait for it to return,
+ * during which time ptlrpcd is completely blocked, so e.g. if import
+ * fails, recovery cannot progress because connection requests are also
+ * sent by ptlrpcd.
+ *
+ * @{
+ */
+
 #define DEBUG_SUBSYSTEM S_RPC
 
 #ifdef __KERNEL__
@@ -102,7 +119,7 @@ void ptlrpcd_wake(struct ptlrpc_request *req)
         cfs_waitq_signal(&rq_set->set_waitq);
 }
 
-/*
+/**
  * Move all request from an existing request set to the ptlrpcd queue.
  * All requests from the set must be in phase RQ_PHASE_NEW.
  */
@@ -119,13 +136,13 @@ void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
                 cfs_list_del_init(&req->rq_set_chain);
                 req->rq_set = NULL;
                 ptlrpcd_add_req(req, PSCOPE_OTHER);
-                set->set_remaining--;
+                cfs_atomic_dec(&set->set_remaining);
         }
-        LASSERT(set->set_remaining == 0);
+        LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
 }
 EXPORT_SYMBOL(ptlrpcd_add_rqset);
 
-/*
+/**
  * Requests that are added to the ptlrpcd queue are sent via
  * ptlrpcd_check->ptlrpc_check_set().
  */
@@ -136,6 +153,31 @@ int ptlrpcd_add_req(struct ptlrpc_request *req, enum ptlrpcd_scope scope)
         int rc;
 
         LASSERT(scope < PSCOPE_NR);
+        
+        cfs_spin_lock(&req->rq_lock);
+        if (req->rq_invalid_rqset) {
+                cfs_duration_t timeout;
+                struct l_wait_info lwi;
+
+                req->rq_invalid_rqset = 0;
+                cfs_spin_unlock(&req->rq_lock);
+
+                timeout = cfs_time_seconds(5);
+                lwi = LWI_TIMEOUT(timeout, back_to_sleep, NULL);
+                l_wait_event(req->rq_set_waitq, (req->rq_set == NULL), &lwi);
+        } else if (req->rq_set) {
+                LASSERT(req->rq_phase == RQ_PHASE_NEW);
+                LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY);
+
+                /* ptlrpc_check_set will decrease the count */
+                cfs_atomic_inc(&req->rq_set->set_remaining);
+                cfs_spin_unlock(&req->rq_lock);
+
+                cfs_waitq_signal(&req->rq_set->set_waitq);
+        } else {
+                cfs_spin_unlock(&req->rq_lock);
+        }
+
         pt = req->rq_send_state == LUSTRE_IMP_FULL ? PT_NORMAL : PT_RECOVERY;
         pc = &ptlrpcd_scopes[scope].pscope_thread[pt].pt_ctl;
         rc = ptlrpc_set_add_new_req(pc, req);
@@ -165,6 +207,10 @@ int ptlrpcd_add_req(struct ptlrpc_request *req, enum ptlrpcd_scope scope)
         return rc;
 }
 
+/**
+ * Check if there is more work to do on ptlrpcd set.
+ * Returns 1 if yes.
+ */
 static int ptlrpcd_check(const struct lu_env *env, struct ptlrpcd_ctl *pc)
 {
         cfs_list_t *tmp, *pos;
@@ -184,7 +230,7 @@ static int ptlrpcd_check(const struct lu_env *env, struct ptlrpcd_ctl *pc)
         }
         cfs_spin_unlock(&pc->pc_set->set_new_req_lock);
 
-        if (pc->pc_set->set_remaining) {
+        if (cfs_atomic_read(&pc->pc_set->set_remaining)) {
                 rc = rc | ptlrpc_check_set(env, pc->pc_set);
 
                 /*
@@ -216,10 +262,11 @@ static int ptlrpcd_check(const struct lu_env *env, struct ptlrpcd_ctl *pc)
 }
 
 #ifdef __KERNEL__
-/*
+/**
+ * Main ptlrpcd thread.
  * ptlrpc's code paths like to execute in process context, so we have this
- * thread which spins on a set which contains the io rpcs. llite specifies
- * ptlrpcd's set when it pushes pages down into the oscs.
+ * thread which spins on a set which contains the rpcs and sends them.
+ *
  */
 static int ptlrpcd(void *arg)
 {
@@ -311,6 +358,11 @@ static int ptlrpcd(void *arg)
 
 #else /* !__KERNEL__ */
 
+/**
+ * In liblustre we do not have separate threads, so this function
+ * is called from time to time all across common code to see
+ * if something needs to be processed on ptlrpcd set.
+ */
 int ptlrpcd_check_async_rpcs(void *arg)
 {
         struct ptlrpcd_ctl *pc = arg;
@@ -346,7 +398,7 @@ int ptlrpcd_idle(void *arg)
         struct ptlrpcd_ctl *pc = arg;
 
         return (cfs_list_empty(&pc->pc_set->set_new_requests) &&
-                pc->pc_set->set_remaining == 0);
+                cfs_atomic_read(&pc->pc_set->set_remaining) == 0);
 }
 
 #endif
@@ -484,3 +536,4 @@ void ptlrpcd_decref(void)
                 ptlrpcd_fini();
         cfs_mutex_up(&ptlrpcd_sem);
 }
+/** @} ptlrpcd */