return;
}
-static inline struct ptlrpc_bulk_desc *new_bulk(void)
+static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
{
struct ptlrpc_bulk_desc *desc;
- OBD_ALLOC(desc, sizeof(*desc));
+ OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
if (!desc)
return NULL;
spin_lock_init(&desc->bd_lock);
init_waitqueue_head(&desc->bd_waitq);
- INIT_LIST_HEAD(&desc->bd_page_list);
+ desc->bd_max_pages = npages;
+ desc->bd_page_count = 0;
desc->bd_md_h = PTL_HANDLE_NONE;
- desc->bd_me_h = PTL_HANDLE_NONE;
-
+ desc->bd_portal = portal;
+ desc->bd_type = type;
+
return desc;
}
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
- int type, int portal)
+ int npages, int type, int portal)
{
struct obd_import *imp = req->rq_import;
struct ptlrpc_bulk_desc *desc;
LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
-
- desc = new_bulk();
+ desc = new_bulk(npages, type, portal);
if (desc == NULL)
RETURN(NULL);
desc->bd_import_generation = req->rq_import_generation;
desc->bd_import = class_import_get(imp);
desc->bd_req = req;
- desc->bd_type = type;
- desc->bd_portal = portal;
+
+ desc->bd_cbid.cbid_fn = client_bulk_callback;
+ desc->bd_cbid.cbid_arg = desc;
/* This makes req own desc, and free it when she frees herself */
req->rq_bulk = desc;
}
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
- int type, int portal)
+ int npages, int type, int portal)
{
struct obd_export *exp = req->rq_export;
struct ptlrpc_bulk_desc *desc;
LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
- desc = new_bulk();
+ desc = new_bulk(npages, type, portal);
if (desc == NULL)
RETURN(NULL);
desc->bd_export = class_export_get(exp);
desc->bd_req = req;
- desc->bd_type = type;
- desc->bd_portal = portal;
+
+ desc->bd_cbid.cbid_fn = server_bulk_callback;
+ desc->bd_cbid.cbid_arg = desc;
/* NB we don't assign rq_bulk here; server-side requests are
* re-used, and the handler frees the bulk desc explicitly. */
return desc;
}
-int ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
- struct page *page, int pageoffset, int len)
+void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
+ struct page *page, int pageoffset, int len)
{
- struct ptlrpc_bulk_page *bulk;
-
- OBD_ALLOC(bulk, sizeof(*bulk));
- if (bulk == NULL)
- return -ENOMEM;
-
+#ifdef __KERNEL__
+ ptl_kiov_t *kiov = &desc->bd_iov[desc->bd_page_count];
+#else
+ struct iovec *iov = &desc->bd_iov[desc->bd_page_count];
+#endif
+ LASSERT(desc->bd_page_count < desc->bd_max_pages);
LASSERT(page != NULL);
LASSERT(pageoffset >= 0);
LASSERT(len > 0);
LASSERT(pageoffset + len <= PAGE_SIZE);
- bulk->bp_page = page;
- bulk->bp_pageoffset = pageoffset;
- bulk->bp_buflen = len;
-
- bulk->bp_desc = desc;
- list_add_tail(&bulk->bp_link, &desc->bd_page_list);
+#ifdef __KERNEL__
+ kiov->kiov_page = page;
+ kiov->kiov_offset = pageoffset;
+ kiov->kiov_len = len;
+#else
+ iov->iov_base = page->addr + pageoffset;
+ iov->iov_len = len;
+#endif
desc->bd_page_count++;
- return 0;
+ desc->bd_nob += len;
}
void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
{
- struct list_head *tmp, *next;
ENTRY;
LASSERT(desc != NULL);
LASSERT(desc->bd_page_count != 0x5a5a5a5a); /* not freed already */
LASSERT(!desc->bd_network_rw); /* network hands off or */
-
- list_for_each_safe(tmp, next, &desc->bd_page_list) {
- struct ptlrpc_bulk_page *bulk;
- bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
- ptlrpc_free_bulk_page(bulk);
- }
-
- LASSERT(desc->bd_page_count == 0);
LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
-
if (desc->bd_export)
class_export_put(desc->bd_export);
else
class_import_put(desc->bd_import);
- OBD_FREE(desc, sizeof(*desc));
+ OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
+ bd_iov[desc->bd_max_pages]));
EXIT;
}
-void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *bulk)
-{
- LASSERT(bulk != NULL);
-
- list_del(&bulk->bp_link);
- bulk->bp_desc->bd_page_count--;
- OBD_FREE(bulk, sizeof(*bulk));
-}
-
struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
int count, int *lengths, char **bufs)
{
request->rq_send_state = LUSTRE_IMP_FULL;
request->rq_type = PTL_RPC_MSG_REQUEST;
request->rq_import = class_import_get(imp);
+
+ request->rq_req_cbid.cbid_fn = request_out_callback;
+ request->rq_req_cbid.cbid_arg = request;
+
+ request->rq_reply_cbid.cbid_fn = reply_in_callback;
+ request->rq_reply_cbid.cbid_arg = request;
+
request->rq_phase = RQ_PHASE_NEW;
/* XXX FIXME bug 249 */
ENTRY;
LASSERT(!req->rq_receiving_reply);
- LASSERT(req->rq_replied);
/* NB Until this point, the whole of the incoming message,
* including buflens, status etc is in the sender's byte order. */
/* Clear reply swab mask; this is a new reply in sender's byte order */
req->rq_rep_swab_mask = 0;
#endif
- rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
+ LASSERT (req->rq_nob_received <= req->rq_replen);
+ rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
if (rc) {
CERROR("unpack_rep failed: %d\n", rc);
RETURN(-EPROTO);
if (req->rq_phase == RQ_PHASE_RPC) {
if (req->rq_waiting || req->rq_resend) {
int status;
+
+ LASSERT (!ptlrpc_client_receiving_reply(req));
+ LASSERT (req->rq_bulk == NULL ||
+ !ptlrpc_bulk_active(req->rq_bulk));
+
spin_lock_irqsave(&imp->imp_lock, flags);
if (ptlrpc_import_delay_req(imp, req, &status)) {
ptlrpc_unregister_reply(req);
if (req->rq_bulk) {
__u64 old_xid = req->rq_xid;
- ptlrpc_unregister_bulk(req);
+
/* ensure previous bulk fails */
req->rq_xid = ptlrpc_next_xid();
CDEBUG(D_HA, "resend bulk "
force_timer_recalc = 1;
}
- /* Ensure the network callback returned */
- spin_lock_irqsave (&req->rq_lock, flags);
- if (!req->rq_replied) {
- spin_unlock_irqrestore (&req->rq_lock, flags);
+ /* Still waiting for a reply? */
+ if (ptlrpc_client_receiving_reply(req))
+ continue;
+
+ /* Did we actually receive a reply? */
+ if (!ptlrpc_client_replied(req))
continue;
- }
- spin_unlock_irqrestore (&req->rq_lock, flags);
spin_lock_irqsave(&imp->imp_lock, flags);
list_del_init(&req->rq_list);
}
LASSERT(req->rq_phase == RQ_PHASE_BULK);
- if (!ptlrpc_bulk_complete (req->rq_bulk))
+ if (ptlrpc_bulk_active(req->rq_bulk))
continue;
+ if (!req->rq_bulk->bd_success) {
+ /* The RPC reply arrived OK, but the bulk screwed
+ * up! Dead wierd since the server told us the RPC
+ * was good after getting the REPLY for her GET or
+ * the ACK for her PUT. */
+ DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
+ LBUG();
+ }
+
req->rq_phase = RQ_PHASE_INTERPRET;
interpret:
ptlrpc_unregister_reply (req);
+ if (req->rq_bulk != NULL)
+ ptlrpc_unregister_bulk (req);
+
if (imp == NULL) {
DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
RETURN(1);
LASSERT(!list_empty(&set->set_requests));
list_for_each(tmp, &set->set_requests) {
req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
- (void)ptlrpc_send_new_req(req);
+ if (req->rq_phase == RQ_PHASE_NEW)
+ (void)ptlrpc_send_new_req(req);
}
do {
}
LASSERT(!request->rq_receiving_reply);
+ LASSERT(request->rq_rqbd == NULL); /* client-side */
/* We must take it off the imp_replay_list first. Otherwise, we'll set
* request->rq_reqmsg to NULL while osc_close is dereferencing it. */
*/
void ptlrpc_unregister_reply (struct ptlrpc_request *request)
{
- unsigned long flags;
- int rc;
- ENTRY;
+ int rc;
+ wait_queue_head_t *wq;
+ struct l_wait_info lwi;
LASSERT(!in_interrupt ()); /* might sleep */
- spin_lock_irqsave (&request->rq_lock, flags);
- if (!request->rq_receiving_reply) { /* not waiting for a reply */
- spin_unlock_irqrestore (&request->rq_lock, flags);
- EXIT;
- /* NB reply buffer not freed here */
+ if (!ptlrpc_client_receiving_reply(request))
return;
- }
-
- LASSERT(!request->rq_replied); /* callback hasn't completed */
- spin_unlock_irqrestore (&request->rq_lock, flags);
rc = PtlMDUnlink (request->rq_reply_md_h);
- switch (rc) {
- default:
- LBUG ();
-
- case PTL_OK: /* unlinked before completion */
- LASSERT(request->rq_receiving_reply);
- LASSERT(!request->rq_replied);
- spin_lock_irqsave (&request->rq_lock, flags);
- request->rq_receiving_reply = 0;
- spin_unlock_irqrestore (&request->rq_lock, flags);
- OBD_FREE(request->rq_repmsg, request->rq_replen);
- request->rq_repmsg = NULL;
- EXIT;
+ if (rc == PTL_INV_MD) {
+ LASSERT (!ptlrpc_client_receiving_reply(request));
return;
+ }
+
+ LASSERT (rc == PTL_OK);
- case PTL_MD_INUSE: /* callback in progress */
- for (;;) {
- /* Network access will complete in finite time but
- * the timeout lets us CERROR for visibility */
- struct l_wait_info lwi = LWI_TIMEOUT(10*HZ, NULL, NULL);
-
- rc = l_wait_event (request->rq_reply_waitq,
- request->rq_replied, &lwi);
- LASSERT(rc == 0 || rc == -ETIMEDOUT);
- if (rc == 0) {
- spin_lock_irqsave (&request->rq_lock, flags);
- /* Ensure the callback has completed scheduling
- * me and taken its hands off the request */
- spin_unlock_irqrestore(&request->rq_lock,flags);
- break;
- }
-
- CERROR ("Unexpectedly long timeout: req %p\n", request);
- }
- /* fall through */
-
- case PTL_INV_MD: /* callback completed */
- LASSERT(!request->rq_receiving_reply);
- LASSERT(request->rq_replied);
- EXIT;
- return;
+ if (request->rq_set == NULL)
+ wq = &request->rq_set->set_waitq;
+ else
+ wq = &request->rq_reply_waitq;
+
+ for (;;) {
+ /* Network access will complete in finite time but the HUGE
+ * timeout lets us CWARN for visibility of sluggish NALs */
+ lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
+ rc = l_wait_event (*wq, !ptlrpc_client_receiving_reply(request), &lwi);
+ if (rc == 0)
+ return;
+
+ LASSERT (rc == -ETIMEDOUT);
+ DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout");
}
- /* Not Reached */
}
/* caller must hold imp->imp_lock */
spin_lock_irqsave (&req->rq_lock, flags);
req->rq_resend = 1;
req->rq_timedout = 0;
- if (req->rq_set != NULL)
- wake_up (&req->rq_set->set_waitq);
- else
- wake_up(&req->rq_reply_waitq);
+ if (req->rq_bulk) {
+ __u64 old_xid = req->rq_xid;
+
+ /* ensure previous bulk fails */
+ req->rq_xid = ptlrpc_next_xid();
+ CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
+ old_xid, req->rq_xid);
+ }
+ ptlrpc_wake_client_req(req);
spin_unlock_irqrestore (&req->rq_lock, flags);
+
}
/* XXX: this function and rq_status are currently unused */
spin_lock_irqsave (&req->rq_lock, flags);
req->rq_restart = 1;
req->rq_timedout = 0;
- if (req->rq_set != NULL)
- wake_up (&req->rq_set->set_waitq);
- else
- wake_up(&req->rq_reply_waitq);
+ ptlrpc_wake_client_req(req);
spin_unlock_irqrestore (&req->rq_lock, flags);
}
out:
if (req->rq_bulk != NULL) {
- if (rc >= 0) { /* success so far */
+ if (rc >= 0) {
+ /* success so far. Note that anything going wrong
+ * with bulk now, is EXTREMELY strange, since the
+ * server must have believed that the bulk
+ * tranferred OK before she replied with success to
+ * me. */
lwi = LWI_TIMEOUT(timeout, NULL, NULL);
brc = l_wait_event(req->rq_reply_waitq,
- ptlrpc_bulk_complete(req->rq_bulk),
+ !ptlrpc_bulk_active(req->rq_bulk),
&lwi);
+ LASSERT(brc == 0 || brc == -ETIMEDOUT);
if (brc != 0) {
LASSERT(brc == -ETIMEDOUT);
- CERROR ("Timed out waiting for bulk\n");
+ DEBUG_REQ(D_ERROR, req, "bulk timed out");
rc = brc;
+ } else if (!req->rq_bulk->bd_success) {
+ DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
+ rc = -EIO;
}
}
if (rc < 0)
/* Clear reply swab mask; this is a new reply in sender's byte order */
req->rq_rep_swab_mask = 0;
#endif
- rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
+ LASSERT (req->rq_nob_received <= req->rq_replen);
+ rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
if (rc) {
CERROR("unpack_rep failed: %d\n", rc);
GOTO(out, rc = -EPROTO);
spin_lock (&req->rq_lock);
if (req->rq_import_generation < imp->imp_generation) {
req->rq_err = 1;
- if (req->rq_set != NULL)
- wake_up(&req->rq_set->set_waitq);
- else
- wake_up(&req->rq_reply_waitq);
+ ptlrpc_wake_client_req(req);
}
spin_unlock (&req->rq_lock);
}
spin_lock (&req->rq_lock);
if (req->rq_import_generation < imp->imp_generation) {
req->rq_err = 1;
- if (req->rq_set != NULL)
- wake_up(&req->rq_set->set_waitq);
- else
- wake_up(&req->rq_reply_waitq);
+ ptlrpc_wake_client_req(req);
}
spin_unlock (&req->rq_lock);
}