Whamcloud - gitweb
land b_eq on HEAD
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index fdc1b37..84c781d 100644 (file)
@@ -82,40 +82,42 @@ void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
         return;
 }
 
-static inline struct ptlrpc_bulk_desc *new_bulk(void)
+static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
 {
         struct ptlrpc_bulk_desc *desc;
 
-        OBD_ALLOC(desc, sizeof(*desc));
+        OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
         if (!desc)
                 return NULL;
 
         spin_lock_init(&desc->bd_lock);
         init_waitqueue_head(&desc->bd_waitq);
-        INIT_LIST_HEAD(&desc->bd_page_list);
+        desc->bd_max_pages = npages;
+        desc->bd_page_count = 0;
         desc->bd_md_h = PTL_HANDLE_NONE;
-        desc->bd_me_h = PTL_HANDLE_NONE;
-
+        desc->bd_portal = portal;
+        desc->bd_type = type;
+        
         return desc;
 }
 
 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
-                                               int type, int portal)
+                                               int npages, int type, int portal)
 {
         struct obd_import *imp = req->rq_import;
         struct ptlrpc_bulk_desc *desc;
 
         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
-
-        desc = new_bulk();
+        desc = new_bulk(npages, type, portal);
         if (desc == NULL)
                 RETURN(NULL);
 
         desc->bd_import_generation = req->rq_import_generation;
         desc->bd_import = class_import_get(imp);
         desc->bd_req = req;
-        desc->bd_type = type;
-        desc->bd_portal = portal;
+
+        desc->bd_cbid.cbid_fn  = client_bulk_callback;
+        desc->bd_cbid.cbid_arg = desc;
 
         /* This makes req own desc, and free it when she frees herself */
         req->rq_bulk = desc;
@@ -124,21 +126,22 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
 }
 
 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
-                                               int type, int portal)
+                                               int npages, int type, int portal)
 {
         struct obd_export *exp = req->rq_export;
         struct ptlrpc_bulk_desc *desc;
 
         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
 
-        desc = new_bulk();
+        desc = new_bulk(npages, type, portal);
         if (desc == NULL)
                 RETURN(NULL);
 
         desc->bd_export = class_export_get(exp);
         desc->bd_req = req;
-        desc->bd_type = type;
-        desc->bd_portal = portal;
+
+        desc->bd_cbid.cbid_fn  = server_bulk_callback;
+        desc->bd_cbid.cbid_arg = desc;
 
         /* NB we don't assign rq_bulk here; server-side requests are
          * re-used, and the handler frees the bulk desc explicitly. */
@@ -146,66 +149,50 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
         return desc;
 }
 
-int ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
-                          struct page *page, int pageoffset, int len)
+void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
+                           struct page *page, int pageoffset, int len)
 {
-        struct ptlrpc_bulk_page *bulk;
-
-        OBD_ALLOC(bulk, sizeof(*bulk));
-        if (bulk == NULL)
-                return -ENOMEM;
-
+#ifdef __KERNEL__
+        ptl_kiov_t *kiov = &desc->bd_iov[desc->bd_page_count];
+#else
+        struct iovec *iov = &desc->bd_iov[desc->bd_page_count];
+#endif
+        LASSERT(desc->bd_page_count < desc->bd_max_pages);
         LASSERT(page != NULL);
         LASSERT(pageoffset >= 0);
         LASSERT(len > 0);
         LASSERT(pageoffset + len <= PAGE_SIZE);
 
-        bulk->bp_page = page;
-        bulk->bp_pageoffset = pageoffset;
-        bulk->bp_buflen = len;
-
-        bulk->bp_desc = desc;
-        list_add_tail(&bulk->bp_link, &desc->bd_page_list);
+#ifdef __KERNEL__
+        kiov->kiov_page   = page;
+        kiov->kiov_offset = pageoffset;
+        kiov->kiov_len    = len;
+#else
+        iov->iov_base = page->addr + pageoffset;
+        iov->iov_len  = len;
+#endif
         desc->bd_page_count++;
-        return 0;
+        desc->bd_nob += len;
 }
 
 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
 {
-        struct list_head *tmp, *next;
         ENTRY;
 
         LASSERT(desc != NULL);
         LASSERT(desc->bd_page_count != 0x5a5a5a5a); /* not freed already */
         LASSERT(!desc->bd_network_rw);         /* network hands off or */
-
-        list_for_each_safe(tmp, next, &desc->bd_page_list) {
-                struct ptlrpc_bulk_page *bulk;
-                bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
-                ptlrpc_free_bulk_page(bulk);
-        }
-
-        LASSERT(desc->bd_page_count == 0);
         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
-
         if (desc->bd_export)
                 class_export_put(desc->bd_export);
         else
                 class_import_put(desc->bd_import);
 
-        OBD_FREE(desc, sizeof(*desc));
+        OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, 
+                                bd_iov[desc->bd_max_pages]));
         EXIT;
 }
 
-void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *bulk)
-{
-        LASSERT(bulk != NULL);
-
-        list_del(&bulk->bp_link);
-        bulk->bp_desc->bd_page_count--;
-        OBD_FREE(bulk, sizeof(*bulk));
-}
-
 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
                                        int count, int *lengths, char **bufs)
 {
@@ -235,6 +222,13 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
         request->rq_send_state = LUSTRE_IMP_FULL;
         request->rq_type = PTL_RPC_MSG_REQUEST;
         request->rq_import = class_import_get(imp);
+
+        request->rq_req_cbid.cbid_fn  = request_out_callback;
+        request->rq_req_cbid.cbid_arg = request;
+
+        request->rq_reply_cbid.cbid_fn  = reply_in_callback;
+        request->rq_reply_cbid.cbid_arg = request;
+        
         request->rq_phase = RQ_PHASE_NEW;
 
         /* XXX FIXME bug 249 */
@@ -462,7 +456,6 @@ static int after_reply(struct ptlrpc_request *req)
         ENTRY;
 
         LASSERT(!req->rq_receiving_reply);
-        LASSERT(req->rq_replied);
 
         /* NB Until this point, the whole of the incoming message,
          * including buflens, status etc is in the sender's byte order. */
@@ -471,7 +464,8 @@ static int after_reply(struct ptlrpc_request *req)
         /* Clear reply swab mask; this is a new reply in sender's byte order */
         req->rq_rep_swab_mask = 0;
 #endif
-        rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
+        LASSERT (req->rq_nob_received <= req->rq_replen);
+        rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
         if (rc) {
                 CERROR("unpack_rep failed: %d\n", rc);
                 RETURN(-EPROTO);
@@ -658,6 +652,11 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                 if (req->rq_phase == RQ_PHASE_RPC) {
                         if (req->rq_waiting || req->rq_resend) {
                                 int status;
+
+                                LASSERT (!ptlrpc_client_receiving_reply(req));
+                                LASSERT (req->rq_bulk == NULL ||
+                                         !ptlrpc_bulk_active(req->rq_bulk));
+
                                 spin_lock_irqsave(&imp->imp_lock, flags);
 
                                 if (ptlrpc_import_delay_req(imp, req, &status)) {
@@ -686,7 +685,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                                         ptlrpc_unregister_reply(req);
                                         if (req->rq_bulk) {
                                                 __u64 old_xid = req->rq_xid;
-                                                ptlrpc_unregister_bulk(req);
+
                                                 /* ensure previous bulk fails */
                                                 req->rq_xid = ptlrpc_next_xid();
                                                 CDEBUG(D_HA, "resend bulk "
@@ -707,13 +706,13 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                                 force_timer_recalc = 1;
                         }
 
-                        /* Ensure the network callback returned */
-                        spin_lock_irqsave (&req->rq_lock, flags);
-                        if (!req->rq_replied) {
-                                spin_unlock_irqrestore (&req->rq_lock, flags);
+                        /* Still waiting for a reply? */
+                        if (ptlrpc_client_receiving_reply(req))
+                                continue;
+
+                        /* Did we actually receive a reply? */
+                        if (!ptlrpc_client_replied(req))
                                 continue;
-                        }
-                        spin_unlock_irqrestore (&req->rq_lock, flags);
 
                         spin_lock_irqsave(&imp->imp_lock, flags);
                         list_del_init(&req->rq_list);
@@ -745,9 +744,18 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                 }
 
                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
-                if (!ptlrpc_bulk_complete (req->rq_bulk))
+                if (ptlrpc_bulk_active(req->rq_bulk))
                         continue;
 
+                if (!req->rq_bulk->bd_success) {
+                        /* The RPC reply arrived OK, but the bulk screwed
+                         * up!  Dead wierd since the server told us the RPC
+                         * was good after getting the REPLY for her GET or
+                         * the ACK for her PUT. */
+                        DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
+                        LBUG();
+                }
+
                 req->rq_phase = RQ_PHASE_INTERPRET;
 
         interpret:
@@ -796,6 +804,9 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req)
 
         ptlrpc_unregister_reply (req);
 
+        if (req->rq_bulk != NULL)
+                ptlrpc_unregister_bulk (req);
+
         if (imp == NULL) {
                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
                 RETURN(1);
@@ -926,7 +937,8 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
         LASSERT(!list_empty(&set->set_requests));
         list_for_each(tmp, &set->set_requests) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
-                (void)ptlrpc_send_new_req(req);
+                if (req->rq_phase == RQ_PHASE_NEW)
+                        (void)ptlrpc_send_new_req(req);
         }
 
         do {
@@ -981,6 +993,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
         }
 
         LASSERT(!request->rq_receiving_reply);
+        LASSERT(request->rq_rqbd == NULL);    /* client-side */
 
         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
@@ -1073,67 +1086,39 @@ void ptlrpc_req_finished(struct ptlrpc_request *request)
  */
 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
 {
-        unsigned long flags;
-        int           rc;
-        ENTRY;
+        int                rc;
+        wait_queue_head_t *wq;
+        struct l_wait_info lwi;
 
         LASSERT(!in_interrupt ());             /* might sleep */
 
-        spin_lock_irqsave (&request->rq_lock, flags);
-        if (!request->rq_receiving_reply) {     /* not waiting for a reply */
-                spin_unlock_irqrestore (&request->rq_lock, flags);
-                EXIT;
-                /* NB reply buffer not freed here */
+        if (!ptlrpc_client_receiving_reply(request))
                 return;
-        }
-
-        LASSERT(!request->rq_replied);         /* callback hasn't completed */
-        spin_unlock_irqrestore (&request->rq_lock, flags);
 
         rc = PtlMDUnlink (request->rq_reply_md_h);
-        switch (rc) {
-        default:
-                LBUG ();
-
-        case PTL_OK:                            /* unlinked before completion */
-                LASSERT(request->rq_receiving_reply);
-                LASSERT(!request->rq_replied);
-                spin_lock_irqsave (&request->rq_lock, flags);
-                request->rq_receiving_reply = 0;
-                spin_unlock_irqrestore (&request->rq_lock, flags);
-                OBD_FREE(request->rq_repmsg, request->rq_replen);
-                request->rq_repmsg = NULL;
-                EXIT;
+        if (rc == PTL_INV_MD) {
+                LASSERT (!ptlrpc_client_receiving_reply(request));
                 return;
+        }
+        
+        LASSERT (rc == PTL_OK);
 
-        case PTL_MD_INUSE:                      /* callback in progress */
-                for (;;) {
-                        /* Network access will complete in finite time but
-                         * the timeout lets us CERROR for visibility */
-                        struct l_wait_info lwi = LWI_TIMEOUT(10*HZ, NULL, NULL);
-
-                        rc = l_wait_event (request->rq_reply_waitq,
-                                           request->rq_replied, &lwi);
-                        LASSERT(rc == 0 || rc == -ETIMEDOUT);
-                        if (rc == 0) {
-                                spin_lock_irqsave (&request->rq_lock, flags);
-                                /* Ensure the callback has completed scheduling
-                                 * me and taken its hands off the request */
-                                spin_unlock_irqrestore(&request->rq_lock,flags);
-                                break;
-                        }
-
-                        CERROR ("Unexpectedly long timeout: req %p\n", request);
-                }
-                /* fall through */
-
-        case PTL_INV_MD:                        /* callback completed */
-                LASSERT(!request->rq_receiving_reply);
-                LASSERT(request->rq_replied);
-                EXIT;
-                return;
+        if (request->rq_set == NULL)
+                wq = &request->rq_set->set_waitq;
+        else
+                wq = &request->rq_reply_waitq;
+
+        for (;;) {
+                /* Network access will complete in finite time but the HUGE
+                 * timeout lets us CWARN for visibility of sluggish NALs */
+                lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
+                rc = l_wait_event (*wq, !ptlrpc_client_receiving_reply(request), &lwi);
+                if (rc == 0)
+                        return;
+
+                LASSERT (rc == -ETIMEDOUT);
+                DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout");
         }
-        /* Not Reached */
 }
 
 /* caller must hold imp->imp_lock */
@@ -1207,11 +1192,17 @@ void ptlrpc_resend_req(struct ptlrpc_request *req)
         spin_lock_irqsave (&req->rq_lock, flags);
         req->rq_resend = 1;
         req->rq_timedout = 0;
-        if (req->rq_set != NULL)
-                wake_up (&req->rq_set->set_waitq);
-        else
-                wake_up(&req->rq_reply_waitq);
+        if (req->rq_bulk) {
+                __u64 old_xid = req->rq_xid;
+                
+                /* ensure previous bulk fails */
+                req->rq_xid = ptlrpc_next_xid();
+                CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
+                       old_xid, req->rq_xid);
+        }
+        ptlrpc_wake_client_req(req);
         spin_unlock_irqrestore (&req->rq_lock, flags);
+
 }
 
 /* XXX: this function and rq_status are currently unused */
@@ -1225,10 +1216,7 @@ void ptlrpc_restart_req(struct ptlrpc_request *req)
         spin_lock_irqsave (&req->rq_lock, flags);
         req->rq_restart = 1;
         req->rq_timedout = 0;
-        if (req->rq_set != NULL)
-                wake_up (&req->rq_set->set_waitq);
-        else
-                wake_up(&req->rq_reply_waitq);
+        ptlrpc_wake_client_req(req);
         spin_unlock_irqrestore (&req->rq_lock, flags);
 }
 
@@ -1456,15 +1444,24 @@ restart:
 
  out:
         if (req->rq_bulk != NULL) {
-                if (rc >= 0) {                  /* success so far */
+                if (rc >= 0) {                  
+                        /* success so far.  Note that anything going wrong
+                         * with bulk now, is EXTREMELY strange, since the
+                         * server must have believed that the bulk
+                         * tranferred OK before she replied with success to
+                         * me. */
                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
                         brc = l_wait_event(req->rq_reply_waitq,
-                                           ptlrpc_bulk_complete(req->rq_bulk),
+                                           !ptlrpc_bulk_active(req->rq_bulk),
                                            &lwi);
+                        LASSERT(brc == 0 || brc == -ETIMEDOUT);
                         if (brc != 0) {
                                 LASSERT(brc == -ETIMEDOUT);
-                                CERROR ("Timed out waiting for bulk\n");
+                                DEBUG_REQ(D_ERROR, req, "bulk timed out");
                                 rc = brc;
+                        } else if (!req->rq_bulk->bd_success) {
+                                DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
+                                rc = -EIO;
                         }
                 }
                 if (rc < 0)
@@ -1499,7 +1496,8 @@ static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
         /* Clear reply swab mask; this is a new reply in sender's byte order */
         req->rq_rep_swab_mask = 0;
 #endif
-        rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
+        LASSERT (req->rq_nob_received <= req->rq_replen);
+        rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
         if (rc) {
                 CERROR("unpack_rep failed: %d\n", rc);
                 GOTO(out, rc = -EPROTO);
@@ -1607,10 +1605,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
                 spin_lock (&req->rq_lock);
                 if (req->rq_import_generation < imp->imp_generation) {
                         req->rq_err = 1;
-                        if (req->rq_set != NULL)
-                                wake_up(&req->rq_set->set_waitq);
-                        else
-                                wake_up(&req->rq_reply_waitq);
+                        ptlrpc_wake_client_req(req);
                 }
                 spin_unlock (&req->rq_lock);
         }
@@ -1624,10 +1619,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
                 spin_lock (&req->rq_lock);
                 if (req->rq_import_generation < imp->imp_generation) {
                         req->rq_err = 1;
-                        if (req->rq_set != NULL)
-                                wake_up(&req->rq_set->set_waitq);
-                        else
-                                wake_up(&req->rq_reply_waitq);
+                        ptlrpc_wake_client_req(req);
                 }
                 spin_unlock (&req->rq_lock);
         }