Whamcloud - gitweb
merge b_devel into HEAD (20030626 merge tag) for 0.7.1
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index 94a068d..c79329c 100644 (file)
@@ -113,11 +113,11 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
         struct ptlrpc_bulk_desc *desc;
 
         LASSERT (type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
-        
+
         desc = new_bulk();
         if (desc == NULL)
                 RETURN(NULL);
-        
+
         /* Is this sampled at the right place?  Do we want to get the import
          * generation just before we send?  Should it match the generation of
          * the request? */
@@ -143,7 +143,7 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
         struct ptlrpc_bulk_desc *desc;
 
         LASSERT (type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
-        
+
         desc = new_bulk();
         if (desc == NULL)
                 RETURN(NULL);
@@ -191,7 +191,7 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
         LASSERT (desc != NULL);
         LASSERT (desc->bd_page_count != 0x5a5a5a5a); /* not freed already */
         LASSERT (!desc->bd_network_rw);         /* network hands off or */
-        
+
         list_for_each_safe(tmp, next, &desc->bd_page_list) {
                 struct ptlrpc_bulk_page *bulk;
                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
@@ -213,7 +213,7 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
 void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *bulk)
 {
         LASSERT (bulk != NULL);
-        
+
         list_del(&bulk->bp_link);
         bulk->bp_desc->bd_page_count--;
         OBD_FREE(bulk, sizeof(*bulk));
@@ -247,7 +247,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
         request->rq_type = PTL_RPC_MSG_REQUEST;
         request->rq_import = class_import_get(imp);
         request->rq_phase = RQ_PHASE_NEW;
-        
+
         /* XXX FIXME bug 249 */
         request->rq_request_portal = imp->imp_client->cli_request_portal;
         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
@@ -290,7 +290,7 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
         ENTRY;
 
         /* Requests on the set should either all be completed, or all be new */
-        expected_phase = (set->set_remaining == 0) ? 
+        expected_phase = (set->set_remaining == 0) ?
                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
         list_for_each (tmp, &set->set_requests) {
                 struct ptlrpc_request *req =
@@ -299,9 +299,9 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
                 LASSERT (req->rq_phase == expected_phase);
                 n++;
         }
-        
+
         LASSERT (set->set_remaining == 0 || set->set_remaining == n);
-        
+
         list_for_each_safe(tmp, next, &set->set_requests) {
                 struct ptlrpc_request *req =
                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
@@ -310,12 +310,13 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
                 LASSERT (req->rq_phase == expected_phase);
 
                 if (req->rq_phase == RQ_PHASE_NEW) {
-                        
+
                         if (req->rq_interpret_reply != NULL) {
-                                int (*interpreter)(struct ptlrpc_request *, void *, int) =
+                                int (*interpreter)(struct ptlrpc_request *,
+                                                   void *, int) =
                                         req->rq_interpret_reply;
-                                
-                                /* higher level (i.e. LOV) failed; 
+
+                                /* higher level (i.e. LOV) failed;
                                  * let the sub reqs clean up */
                                 req->rq_status = -EBADR;
                                 interpreter(req, &req->rq_async_args, req->rq_status);
@@ -401,8 +402,7 @@ static int ptlrpc_check_status(struct ptlrpc_request *req)
 }
 
 #warning this needs to change after robert fixes eviction handling
-static int 
-after_reply(struct ptlrpc_request *req, int *restartp)
+static int after_reply(struct ptlrpc_request *req, int *restartp)
 {
         unsigned long flags;
         struct obd_import *imp = req->rq_import;
@@ -414,7 +414,7 @@ after_reply(struct ptlrpc_request *req, int *restartp)
 
         if (restartp != NULL)
                 *restartp = 0;
-        
+
         /* NB Until this point, the whole of the incoming message,
          * including buflens, status etc is in the sender's byte order. */
 
@@ -441,8 +441,8 @@ after_reply(struct ptlrpc_request *req, int *restartp)
         rc = ptlrpc_check_status(req);
 
         /* Either we've been evicted, or the server has failed for
-         * some reason. Try to reconnect, and if that fails, punt to
-         * upcall */
+         * some reason. Try to reconnect, and if that fails, punt to the
+         * upcall. */
         if (rc == -ENOTCONN) {
                 if (req->rq_level < LUSTRE_CONN_FULL || req->rq_no_recov ||
                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
@@ -451,7 +451,7 @@ after_reply(struct ptlrpc_request *req, int *restartp)
 
                 rc = ptlrpc_request_handle_eviction(req);
                 if (rc)
-                        CERROR("can't reconnect to %s@%s: %d\n", 
+                        CERROR("can't reconnect to %s@%s: %d\n",
                                imp->imp_target_uuid.uuid,
                                imp->imp_connection->c_remote_uuid.uuid, rc);
                 else
@@ -486,7 +486,7 @@ after_reply(struct ptlrpc_request *req, int *restartp)
 
                 /* Replay-enabled imports return commit-status information. */
                 if (req->rq_repmsg->last_committed) {
-                        if (req->rq_repmsg->last_committed < 
+                        if (req->rq_repmsg->last_committed <
                             imp->imp_peer_committed_transno) {
                                 CERROR("%s went back in time (transno "LPD64
                                        " was committed, server claims "LPD64
@@ -501,7 +501,7 @@ after_reply(struct ptlrpc_request *req, int *restartp)
                 ptlrpc_free_committed(imp);
                 spin_unlock_irqrestore(&imp->imp_lock, flags);
         }
-        
+
         RETURN(rc);
 }
 
@@ -520,50 +520,57 @@ static int check_set(struct ptlrpc_request_set *set)
                 struct obd_import *imp = req->rq_import;
                 int rc = 0;
 
-                LASSERT (req->rq_phase == RQ_PHASE_RPC ||
-                         req->rq_phase == RQ_PHASE_BULK ||
-                         req->rq_phase == RQ_PHASE_COMPLETE);
+                if (!(req->rq_phase == RQ_PHASE_RPC ||
+                      req->rq_phase == RQ_PHASE_BULK ||
+                      req->rq_phase == RQ_PHASE_INTERPRET ||
+                      req->rq_phase == RQ_PHASE_COMPLETE)) {
+                        DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
+                        LBUG();
+                }
 
                 if (req->rq_phase == RQ_PHASE_COMPLETE)
                         continue;
 
+                if (req->rq_phase == RQ_PHASE_INTERPRET)
+                        GOTO (interpret, req->rq_status);
+                
                 if (req->rq_err) {
                         ptlrpc_unregister_reply(req);
                         if (req->rq_status == 0)
                                 req->rq_status = -EIO;
                         req->rq_phase = RQ_PHASE_INTERPRET;
-                
+
                         spin_lock_irqsave(&imp->imp_lock, flags);
                         list_del_init(&req->rq_list);
                         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
                         GOTO (interpret, req->rq_status);
-                } 
-                
+                }
+
                 if (req->rq_intr) {
                         /* NB could be on delayed list */
                         ptlrpc_unregister_reply(req);
                         req->rq_status = -EINTR;
                         req->rq_phase = RQ_PHASE_INTERPRET;
-                
+
                         spin_lock_irqsave(&imp->imp_lock, flags);
                         list_del_init(&req->rq_list);
                         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
                         GOTO (interpret, req->rq_status);
                 }
-                
+
                 if (req->rq_phase == RQ_PHASE_RPC) {
                         int do_restart = 0;
                         if (req->rq_waiting || req->rq_resend) {
                                 spin_lock_irqsave(&imp->imp_lock, flags);
-                                
+
                                 if (req->rq_level > imp->imp_level) {
                                         spin_unlock_irqrestore(&imp->imp_lock,
                                                                flags);
                                         continue;
                                 }
-                                
+
                                 list_del(&req->rq_list);
                                 list_add_tail(&req->rq_list,
                                               &imp->imp_sending_list);
@@ -578,19 +585,19 @@ static int check_set(struct ptlrpc_request_set *set)
                                         spin_unlock_irqrestore(&req->rq_lock,
                                                                flags);
                                         ptlrpc_unregister_reply(req);
-                                        if (req->rq_bulk) 
+                                        if (req->rq_bulk)
                                                 ptlrpc_unregister_bulk(req);
                                }
-                                
+
                                 rc = ptl_send_rpc(req);
                                 if (rc) {
                                         req->rq_status = rc;
                                         req->rq_phase = RQ_PHASE_INTERPRET;
                                         GOTO (interpret, req->rq_status);
                                 }
-                                
+
                         }
-                
+
                         /* Ensure the network callback returned */
                         spin_lock_irqsave (&req->rq_lock, flags);
                         if (!req->rq_replied) {
@@ -598,18 +605,25 @@ static int check_set(struct ptlrpc_request_set *set)
                                 continue;
                         }
                         spin_unlock_irqrestore (&req->rq_lock, flags);
-                
+
                         spin_lock_irqsave(&imp->imp_lock, flags);
                         list_del_init(&req->rq_list);
                         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
                         req->rq_status = after_reply(req, &do_restart);
                         if (do_restart) {
+                                spin_lock_irqsave (&req->rq_lock, flags);
                                 req->rq_resend = 1; /* ugh */
+                                spin_unlock_irqrestore (&req->rq_lock, flags);
                                 continue;
                         }
-                        
-                        if (req->rq_bulk == NULL) {
+
+                        /* If there is no bulk associated with this request,
+                         * then we're done and should let the interpreter
+                         * process the reply.  Similarly if the RPC returned
+                         * an error, and therefore the bulk will never arrive.
+                         */
+                        if (req->rq_bulk == NULL || req->rq_status != 0) {
                                 req->rq_phase = RQ_PHASE_INTERPRET;
                                 GOTO (interpret, req->rq_status);
                         }
@@ -620,20 +634,20 @@ static int check_set(struct ptlrpc_request_set *set)
                 LASSERT (req->rq_phase == RQ_PHASE_BULK);
                 if (!ptlrpc_bulk_complete (req->rq_bulk))
                         continue;
-                
+
                 req->rq_phase = RQ_PHASE_INTERPRET;
-                
+
         interpret:
                 LASSERT (req->rq_phase == RQ_PHASE_INTERPRET);
                 LASSERT (!req->rq_receiving_reply);
 
                 if (req->rq_bulk != NULL)
                         ptlrpc_unregister_bulk (req);
-                
+
                 if (req->rq_interpret_reply != NULL) {
-                        int (*interpreter)(struct ptlrpc_request *, void *, int) =
+                        int (*interpreter)(struct ptlrpc_request *,void *,int) =
                                 req->rq_interpret_reply;
-                        req->rq_status = interpreter(req, &req->rq_async_args, 
+                        req->rq_status = interpreter(req, &req->rq_async_args,
                                                      req->rq_status);
                 }
 
@@ -694,7 +708,6 @@ static int expired_set(void *data)
         ENTRY;
 
         LASSERT (set != NULL);
-        CERROR("EXPIRED SET %p\n", set);
 
         /* A timeout expired; see which reqs it applies to... */
         list_for_each (tmp, &set->set_requests) {
@@ -705,7 +718,7 @@ static int expired_set(void *data)
                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
                       (req->rq_phase == RQ_PHASE_BULK)))
                         continue;
-                
+
                 if (req->rq_timedout ||           /* already dealt with */
                     req->rq_sent + req->rq_timeout > now) /* not expired */
                         continue;
@@ -736,7 +749,7 @@ static void interrupted_set(void *data)
 
                 if (req->rq_phase != RQ_PHASE_RPC)
                         continue;
-                
+
                 spin_lock_irqsave (&req->rq_lock, flags);
                 req->rq_intr = 1;
                 spin_unlock_irqrestore (&req->rq_lock, flags);
@@ -756,13 +769,14 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
         int                    timeout;
         ENTRY;
 
+        LASSERT(!list_empty(&set->set_requests));
         list_for_each(tmp, &set->set_requests) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
 
                 LASSERT (req->rq_level == LUSTRE_CONN_FULL);
                 LASSERT (req->rq_phase == RQ_PHASE_NEW);
                 req->rq_phase = RQ_PHASE_RPC;
-                
+
                 imp = req->rq_import;
                 spin_lock_irqsave(&imp->imp_lock, flags);
 
@@ -827,7 +841,7 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
 
                         if (req->rq_timedout)   /* already timed out */
                                 continue;
-                        
+
                         deadline = req->rq_sent + req->rq_timeout;
                         if (deadline <= now)    /* actually expired already */
                                 timeout = 1;    /* ASAP */
@@ -839,10 +853,10 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
                  * req times out */
                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
                        set, timeout);
-                lwi = LWI_TIMEOUT_INTR(timeout * HZ, 
+                lwi = LWI_TIMEOUT_INTR(timeout ? timeout * HZ : 1,
                                        expired_set, interrupted_set, set);
                 rc = l_wait_event(set->set_waitq, check_set(set), &lwi);
-                
+
                 LASSERT (rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
 
                 /* -EINTR => all requests have been flagged rq_intr so next
@@ -864,13 +878,13 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
                 if (req->rq_status != 0)
                         rc = req->rq_status;
         }
-        
+
         if (set->set_interpret != NULL) {
-                int (*interpreter)(struct ptlrpc_request_set *set, void *, int) =
+                int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
                         set->set_interpret;
                 rc = interpreter (set, &set->set_args, rc);
         }
-        
+
         RETURN(rc);
 }
 
@@ -883,7 +897,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
         }
 
         LASSERT (!request->rq_receiving_reply);
-        
+
         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
         if (request->rq_import != NULL) {
@@ -967,7 +981,7 @@ static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
 }
 
 /* Disengage the client's reply buffer from the network
- * NB does _NOT_ unregister any client-side bulk. 
+ * NB does _NOT_ unregister any client-side bulk.
  * IDEMPOTENT, but _not_ safe against concurrent callers.
  * The request owner (i.e. the thread doing the I/O) must call...
  */
@@ -989,7 +1003,7 @@ void ptlrpc_unregister_reply (struct ptlrpc_request *request)
 
         LASSERT (!request->rq_replied);         /* callback hasn't completed */
         spin_unlock_irqrestore (&request->rq_lock, flags);
-        
+
         rc = PtlMDUnlink (request->rq_reply_md_h);
         switch (rc) {
         default:
@@ -1005,24 +1019,24 @@ void ptlrpc_unregister_reply (struct ptlrpc_request *request)
                 request->rq_repmsg = NULL;
                 EXIT;
                 return;
-                
+
         case PTL_MD_INUSE:                      /* callback in progress */
                 for (;;) {
                         /* Network access will complete in finite time but
                          * the timeout lets us CERROR for visibility */
-                        struct l_wait_info lwi = LWI_TIMEOUT (10 * HZ, NULL, NULL);
-                        
+                        struct l_wait_info lwi = LWI_TIMEOUT(10*HZ, NULL, NULL);
+
                         rc = l_wait_event (request->rq_wait_for_rep,
                                            request->rq_replied, &lwi);
                         LASSERT (rc == 0 || rc == -ETIMEDOUT);
                         if (rc == 0) {
                                 spin_lock_irqsave (&request->rq_lock, flags);
-                                /* Ensure the callback has completed scheduling me 
-                                 * and taken its hands off the request */
-                                spin_unlock_irqrestore (&request->rq_lock, flags);
+                                /* Ensure the callback has completed scheduling
+                                 * me and taken its hands off the request */
+                                spin_unlock_irqrestore(&request->rq_lock,flags);
                                 break;
                         }
-                        
+
                         CERROR ("Unexpectedly long timeout: req %p\n", request);
                 }
                 /* fall through */
@@ -1091,7 +1105,7 @@ void ptlrpc_cleanup_client(struct obd_import *imp)
 void ptlrpc_resend_req(struct ptlrpc_request *req)
 {
         unsigned long flags;
-        
+
         DEBUG_REQ(D_HA, req, "resending");
         req->rq_reqmsg->handle.cookie = 0;
         ptlrpc_put_connection(req->rq_connection);
@@ -1138,7 +1152,7 @@ static int expired_request(void *data)
 static void interrupted_request(void *data)
 {
         unsigned long flags;
-        
+
         struct ptlrpc_request *req = data;
         DEBUG_REQ(D_HA, req, "request interrupted");
         spin_lock_irqsave (&req->rq_lock, flags);
@@ -1206,7 +1220,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
 
         LASSERT (req->rq_set == NULL);
         LASSERT (!req->rq_receiving_reply);
-        
+
         /* for distributed debugging */
         req->rq_reqmsg->status = current->pid;
         LASSERT(imp->imp_obd != NULL);
@@ -1219,7 +1233,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
 
         /* Mark phase here for a little debug help */
         req->rq_phase = RQ_PHASE_RPC;
-        
+
 restart:
         /*
          * If the import has been invalidated (such as by an OST failure), the
@@ -1265,7 +1279,7 @@ restart:
                         spin_unlock_irqrestore(&imp->imp_lock, flags);
                         GOTO (out, rc);
                 }
-                
+
                 CERROR("process %d resumed\n", current->pid);
         }
 
@@ -1287,7 +1301,7 @@ restart:
                 }
 
                 DEBUG_REQ(D_ERROR, req, "send failed (%d); recovering", rc);
-                
+
                 ptlrpc_fail_import(imp, req->rq_import_generation);
 
                 /* If we've been told to not wait, we're done. */
@@ -1340,7 +1354,7 @@ restart:
          * (ensuring the reply callback has returned), sees that
          * req->rq_receiving_reply is clear and returns. */
         ptlrpc_unregister_reply (req);
-        
+
         if (req->rq_err)
                 GOTO(out, rc = -EIO);
 
@@ -1360,7 +1374,7 @@ restart:
 
                 if (req->rq_bulk != NULL)
                         ptlrpc_unregister_bulk (req);
-        
+
                 DEBUG_REQ(D_HA, req, "resending: ");
                 goto restart;
         }
@@ -1376,7 +1390,7 @@ restart:
         if (req->rq_timedout) {                 /* non-recoverable timeout */
                 GOTO(out, rc = -ETIMEDOUT);
         }
-        
+
         if (!req->rq_replied) {
                 /* How can this be? -eeb */
                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
@@ -1396,9 +1410,10 @@ restart:
  out:
         if (req->rq_bulk != NULL) {
                 if (rc >= 0) {                  /* success so far */
-                        lwi = LWI_TIMEOUT (timeout, NULL, NULL);
-                        brc = l_wait_event (req->rq_wait_for_rep, 
-                                            ptlrpc_bulk_complete (req->rq_bulk), &lwi);
+                        lwi = LWI_TIMEOUT(timeout, NULL, NULL);
+                        brc = l_wait_event(req->rq_wait_for_rep,
+                                           ptlrpc_bulk_complete(req->rq_bulk),
+                                           &lwi);
                         if (brc != 0) {
                                 LASSERT (brc == -ETIMEDOUT);
                                 CERROR ("Timed out waiting for bulk\n");
@@ -1412,7 +1427,7 @@ restart:
                         ptlrpc_unregister_bulk (req);
                 }
         }
-        
+
         LASSERT (!req->rq_receiving_reply);
         req->rq_phase = RQ_PHASE_INTERPRET;
         RETURN (rc);
@@ -1427,10 +1442,10 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
 
         /* I don't touch rq_phase here, so the debug log can show what
          * state it was left in */
-        
+
         /* Not handling automatic bulk replay yet (or ever?) */
         LASSERT (req->rq_bulk == NULL);
-        
+
         DEBUG_REQ(D_NET, req, "about to replay");
 
         /* Update request's state, since we might have a new connection. */