struct ptlrpc_bulk_desc *desc;
LASSERT (type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
-
+
desc = new_bulk();
if (desc == NULL)
RETURN(NULL);
-
+
/* Is this sampled at the right place? Do we want to get the import
* generation just before we send? Should it match the generation of
* the request? */
struct ptlrpc_bulk_desc *desc;
LASSERT (type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
-
+
desc = new_bulk();
if (desc == NULL)
RETURN(NULL);
LASSERT (desc != NULL);
LASSERT (desc->bd_page_count != 0x5a5a5a5a); /* not freed already */
LASSERT (!desc->bd_network_rw); /* network hands off or */
-
+
list_for_each_safe(tmp, next, &desc->bd_page_list) {
struct ptlrpc_bulk_page *bulk;
bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *bulk)
{
LASSERT (bulk != NULL);
-
+
list_del(&bulk->bp_link);
bulk->bp_desc->bd_page_count--;
OBD_FREE(bulk, sizeof(*bulk));
request->rq_type = PTL_RPC_MSG_REQUEST;
request->rq_import = class_import_get(imp);
request->rq_phase = RQ_PHASE_NEW;
-
+
/* XXX FIXME bug 249 */
request->rq_request_portal = imp->imp_client->cli_request_portal;
request->rq_reply_portal = imp->imp_client->cli_reply_portal;
ENTRY;
/* Requests on the set should either all be completed, or all be new */
- expected_phase = (set->set_remaining == 0) ?
+ expected_phase = (set->set_remaining == 0) ?
RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
list_for_each (tmp, &set->set_requests) {
struct ptlrpc_request *req =
LASSERT (req->rq_phase == expected_phase);
n++;
}
-
+
LASSERT (set->set_remaining == 0 || set->set_remaining == n);
-
+
list_for_each_safe(tmp, next, &set->set_requests) {
struct ptlrpc_request *req =
list_entry(tmp, struct ptlrpc_request, rq_set_chain);
LASSERT (req->rq_phase == expected_phase);
if (req->rq_phase == RQ_PHASE_NEW) {
-
+
if (req->rq_interpret_reply != NULL) {
- int (*interpreter)(struct ptlrpc_request *, void *, int) =
+ int (*interpreter)(struct ptlrpc_request *,
+ void *, int) =
req->rq_interpret_reply;
-
- /* higher level (i.e. LOV) failed;
+
+ /* higher level (i.e. LOV) failed;
* let the sub reqs clean up */
req->rq_status = -EBADR;
interpreter(req, &req->rq_async_args, req->rq_status);
}
#warning this needs to change after robert fixes eviction handling
-static int
-after_reply(struct ptlrpc_request *req, int *restartp)
+static int after_reply(struct ptlrpc_request *req, int *restartp)
{
unsigned long flags;
struct obd_import *imp = req->rq_import;
if (restartp != NULL)
*restartp = 0;
-
+
/* NB Until this point, the whole of the incoming message,
* including buflens, status etc is in the sender's byte order. */
rc = ptlrpc_check_status(req);
/* Either we've been evicted, or the server has failed for
- * some reason. Try to reconnect, and if that fails, punt to
- * upcall */
+ * some reason. Try to reconnect, and if that fails, punt to the
+ * upcall. */
if (rc == -ENOTCONN) {
if (req->rq_level < LUSTRE_CONN_FULL || req->rq_no_recov ||
imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
rc = ptlrpc_request_handle_eviction(req);
if (rc)
- CERROR("can't reconnect to %s@%s: %d\n",
+ CERROR("can't reconnect to %s@%s: %d\n",
imp->imp_target_uuid.uuid,
imp->imp_connection->c_remote_uuid.uuid, rc);
else
/* Replay-enabled imports return commit-status information. */
if (req->rq_repmsg->last_committed) {
- if (req->rq_repmsg->last_committed <
+ if (req->rq_repmsg->last_committed <
imp->imp_peer_committed_transno) {
CERROR("%s went back in time (transno "LPD64
" was committed, server claims "LPD64
ptlrpc_free_committed(imp);
spin_unlock_irqrestore(&imp->imp_lock, flags);
}
-
+
RETURN(rc);
}
struct obd_import *imp = req->rq_import;
int rc = 0;
- LASSERT (req->rq_phase == RQ_PHASE_RPC ||
- req->rq_phase == RQ_PHASE_BULK ||
- req->rq_phase == RQ_PHASE_COMPLETE);
+ if (!(req->rq_phase == RQ_PHASE_RPC ||
+ req->rq_phase == RQ_PHASE_BULK ||
+ req->rq_phase == RQ_PHASE_INTERPRET ||
+ req->rq_phase == RQ_PHASE_COMPLETE)) {
+ DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
+ LBUG();
+ }
if (req->rq_phase == RQ_PHASE_COMPLETE)
continue;
+ if (req->rq_phase == RQ_PHASE_INTERPRET)
+ GOTO (interpret, req->rq_status);
+
if (req->rq_err) {
ptlrpc_unregister_reply(req);
if (req->rq_status == 0)
req->rq_status = -EIO;
req->rq_phase = RQ_PHASE_INTERPRET;
-
+
spin_lock_irqsave(&imp->imp_lock, flags);
list_del_init(&req->rq_list);
spin_unlock_irqrestore(&imp->imp_lock, flags);
GOTO (interpret, req->rq_status);
- }
-
+ }
+
if (req->rq_intr) {
/* NB could be on delayed list */
ptlrpc_unregister_reply(req);
req->rq_status = -EINTR;
req->rq_phase = RQ_PHASE_INTERPRET;
-
+
spin_lock_irqsave(&imp->imp_lock, flags);
list_del_init(&req->rq_list);
spin_unlock_irqrestore(&imp->imp_lock, flags);
GOTO (interpret, req->rq_status);
}
-
+
if (req->rq_phase == RQ_PHASE_RPC) {
int do_restart = 0;
if (req->rq_waiting || req->rq_resend) {
spin_lock_irqsave(&imp->imp_lock, flags);
-
+
if (req->rq_level > imp->imp_level) {
spin_unlock_irqrestore(&imp->imp_lock,
flags);
continue;
}
-
+
list_del(&req->rq_list);
list_add_tail(&req->rq_list,
&imp->imp_sending_list);
spin_unlock_irqrestore(&req->rq_lock,
flags);
ptlrpc_unregister_reply(req);
- if (req->rq_bulk)
+ if (req->rq_bulk)
ptlrpc_unregister_bulk(req);
}
-
+
rc = ptl_send_rpc(req);
if (rc) {
req->rq_status = rc;
req->rq_phase = RQ_PHASE_INTERPRET;
GOTO (interpret, req->rq_status);
}
-
+
}
-
+
/* Ensure the network callback returned */
spin_lock_irqsave (&req->rq_lock, flags);
if (!req->rq_replied) {
continue;
}
spin_unlock_irqrestore (&req->rq_lock, flags);
-
+
spin_lock_irqsave(&imp->imp_lock, flags);
list_del_init(&req->rq_list);
spin_unlock_irqrestore(&imp->imp_lock, flags);
req->rq_status = after_reply(req, &do_restart);
if (do_restart) {
+ spin_lock_irqsave (&req->rq_lock, flags);
req->rq_resend = 1; /* ugh */
+ spin_unlock_irqrestore (&req->rq_lock, flags);
continue;
}
-
- if (req->rq_bulk == NULL) {
+
+ /* If there is no bulk associated with this request,
+ * then we're done and should let the interpreter
+ * process the reply. Similarly if the RPC returned
+ * an error, and therefore the bulk will never arrive.
+ */
+ if (req->rq_bulk == NULL || req->rq_status != 0) {
req->rq_phase = RQ_PHASE_INTERPRET;
GOTO (interpret, req->rq_status);
}
LASSERT (req->rq_phase == RQ_PHASE_BULK);
if (!ptlrpc_bulk_complete (req->rq_bulk))
continue;
-
+
req->rq_phase = RQ_PHASE_INTERPRET;
-
+
interpret:
LASSERT (req->rq_phase == RQ_PHASE_INTERPRET);
LASSERT (!req->rq_receiving_reply);
if (req->rq_bulk != NULL)
ptlrpc_unregister_bulk (req);
-
+
if (req->rq_interpret_reply != NULL) {
- int (*interpreter)(struct ptlrpc_request *, void *, int) =
+ int (*interpreter)(struct ptlrpc_request *,void *,int) =
req->rq_interpret_reply;
- req->rq_status = interpreter(req, &req->rq_async_args,
+ req->rq_status = interpreter(req, &req->rq_async_args,
req->rq_status);
}
ENTRY;
LASSERT (set != NULL);
- CERROR("EXPIRED SET %p\n", set);
/* A timeout expired; see which reqs it applies to... */
list_for_each (tmp, &set->set_requests) {
if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
(req->rq_phase == RQ_PHASE_BULK)))
continue;
-
+
if (req->rq_timedout || /* already dealt with */
req->rq_sent + req->rq_timeout > now) /* not expired */
continue;
if (req->rq_phase != RQ_PHASE_RPC)
continue;
-
+
spin_lock_irqsave (&req->rq_lock, flags);
req->rq_intr = 1;
spin_unlock_irqrestore (&req->rq_lock, flags);
int timeout;
ENTRY;
+ LASSERT(!list_empty(&set->set_requests));
list_for_each(tmp, &set->set_requests) {
req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
LASSERT (req->rq_level == LUSTRE_CONN_FULL);
LASSERT (req->rq_phase == RQ_PHASE_NEW);
req->rq_phase = RQ_PHASE_RPC;
-
+
imp = req->rq_import;
spin_lock_irqsave(&imp->imp_lock, flags);
if (req->rq_timedout) /* already timed out */
continue;
-
+
deadline = req->rq_sent + req->rq_timeout;
if (deadline <= now) /* actually expired already */
timeout = 1; /* ASAP */
* req times out */
CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
set, timeout);
- lwi = LWI_TIMEOUT_INTR(timeout * HZ,
+ lwi = LWI_TIMEOUT_INTR(timeout ? timeout * HZ : 1,
expired_set, interrupted_set, set);
rc = l_wait_event(set->set_waitq, check_set(set), &lwi);
-
+
LASSERT (rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
/* -EINTR => all requests have been flagged rq_intr so next
if (req->rq_status != 0)
rc = req->rq_status;
}
-
+
if (set->set_interpret != NULL) {
- int (*interpreter)(struct ptlrpc_request_set *set, void *, int) =
+ int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
set->set_interpret;
rc = interpreter (set, &set->set_args, rc);
}
-
+
RETURN(rc);
}
}
LASSERT (!request->rq_receiving_reply);
-
+
/* We must take it off the imp_replay_list first. Otherwise, we'll set
* request->rq_reqmsg to NULL while osc_close is dereferencing it. */
if (request->rq_import != NULL) {
}
/* Disengage the client's reply buffer from the network
- * NB does _NOT_ unregister any client-side bulk.
+ * NB does _NOT_ unregister any client-side bulk.
* IDEMPOTENT, but _not_ safe against concurrent callers.
* The request owner (i.e. the thread doing the I/O) must call...
*/
LASSERT (!request->rq_replied); /* callback hasn't completed */
spin_unlock_irqrestore (&request->rq_lock, flags);
-
+
rc = PtlMDUnlink (request->rq_reply_md_h);
switch (rc) {
default:
request->rq_repmsg = NULL;
EXIT;
return;
-
+
case PTL_MD_INUSE: /* callback in progress */
for (;;) {
/* Network access will complete in finite time but
* the timeout lets us CERROR for visibility */
- struct l_wait_info lwi = LWI_TIMEOUT (10 * HZ, NULL, NULL);
-
+ struct l_wait_info lwi = LWI_TIMEOUT(10*HZ, NULL, NULL);
+
rc = l_wait_event (request->rq_wait_for_rep,
request->rq_replied, &lwi);
LASSERT (rc == 0 || rc == -ETIMEDOUT);
if (rc == 0) {
spin_lock_irqsave (&request->rq_lock, flags);
- /* Ensure the callback has completed scheduling me
- * and taken its hands off the request */
- spin_unlock_irqrestore (&request->rq_lock, flags);
+ /* Ensure the callback has completed scheduling
+ * me and taken its hands off the request */
+ spin_unlock_irqrestore(&request->rq_lock,flags);
break;
}
-
+
CERROR ("Unexpectedly long timeout: req %p\n", request);
}
/* fall through */
void ptlrpc_resend_req(struct ptlrpc_request *req)
{
unsigned long flags;
-
+
DEBUG_REQ(D_HA, req, "resending");
req->rq_reqmsg->handle.cookie = 0;
ptlrpc_put_connection(req->rq_connection);
static void interrupted_request(void *data)
{
unsigned long flags;
-
+
struct ptlrpc_request *req = data;
DEBUG_REQ(D_HA, req, "request interrupted");
spin_lock_irqsave (&req->rq_lock, flags);
LASSERT (req->rq_set == NULL);
LASSERT (!req->rq_receiving_reply);
-
+
/* for distributed debugging */
req->rq_reqmsg->status = current->pid;
LASSERT(imp->imp_obd != NULL);
/* Mark phase here for a little debug help */
req->rq_phase = RQ_PHASE_RPC;
-
+
restart:
/*
* If the import has been invalidated (such as by an OST failure), the
spin_unlock_irqrestore(&imp->imp_lock, flags);
GOTO (out, rc);
}
-
+
CERROR("process %d resumed\n", current->pid);
}
}
DEBUG_REQ(D_ERROR, req, "send failed (%d); recovering", rc);
-
+
ptlrpc_fail_import(imp, req->rq_import_generation);
/* If we've been told to not wait, we're done. */
* (ensuring the reply callback has returned), sees that
* req->rq_receiving_reply is clear and returns. */
ptlrpc_unregister_reply (req);
-
+
if (req->rq_err)
GOTO(out, rc = -EIO);
if (req->rq_bulk != NULL)
ptlrpc_unregister_bulk (req);
-
+
DEBUG_REQ(D_HA, req, "resending: ");
goto restart;
}
if (req->rq_timedout) { /* non-recoverable timeout */
GOTO(out, rc = -ETIMEDOUT);
}
-
+
if (!req->rq_replied) {
/* How can this be? -eeb */
DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
out:
if (req->rq_bulk != NULL) {
if (rc >= 0) { /* success so far */
- lwi = LWI_TIMEOUT (timeout, NULL, NULL);
- brc = l_wait_event (req->rq_wait_for_rep,
- ptlrpc_bulk_complete (req->rq_bulk), &lwi);
+ lwi = LWI_TIMEOUT(timeout, NULL, NULL);
+ brc = l_wait_event(req->rq_wait_for_rep,
+ ptlrpc_bulk_complete(req->rq_bulk),
+ &lwi);
if (brc != 0) {
LASSERT (brc == -ETIMEDOUT);
CERROR ("Timed out waiting for bulk\n");
ptlrpc_unregister_bulk (req);
}
}
-
+
LASSERT (!req->rq_receiving_reply);
req->rq_phase = RQ_PHASE_INTERPRET;
RETURN (rc);
/* I don't touch rq_phase here, so the debug log can show what
* state it was left in */
-
+
/* Not handling automatic bulk replay yet (or ever?) */
LASSERT (req->rq_bulk == NULL);
-
+
DEBUG_REQ(D_NET, req, "about to replay");
/* Update request's state, since we might have a new connection. */