#include <linux/lustre_ha.h>
#include <linux/lustre_import.h>
+#include "ptlrpc_internal.h"
+
void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
struct ptlrpc_client *cl)
{
return c;
}
-void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,struct obd_uuid *uuid)
+void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
+ struct obd_uuid *uuid)
{
struct ptlrpc_peer peer;
int err;
return;
}
-struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn)
+static inline struct ptlrpc_bulk_desc *new_bulk(void)
{
struct ptlrpc_bulk_desc *desc;
OBD_ALLOC(desc, sizeof(*desc));
- if (desc != NULL) {
- desc->bd_connection = ptlrpc_connection_addref(conn);
- atomic_set(&desc->bd_refcount, 1);
- init_waitqueue_head(&desc->bd_waitq);
- INIT_LIST_HEAD(&desc->bd_page_list);
- INIT_LIST_HEAD(&desc->bd_set_chain);
- ptl_set_inv_handle(&desc->bd_md_h);
- ptl_set_inv_handle(&desc->bd_me_h);
- }
+ if (!desc)
+ return NULL;
+
+ spin_lock_init (&desc->bd_lock);
+ init_waitqueue_head(&desc->bd_waitq);
+ INIT_LIST_HEAD(&desc->bd_page_list);
+ desc->bd_md_h = PTL_HANDLE_NONE;
+ desc->bd_me_h = PTL_HANDLE_NONE;
return desc;
}
-int ptlrpc_bulk_error(struct ptlrpc_bulk_desc *desc)
+struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
+ int type, int portal)
{
- int rc = 0;
- if (desc->bd_flags & PTL_RPC_FL_TIMEOUT) {
- rc = (desc->bd_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
- -ETIMEDOUT);
- }
- return rc;
+ struct obd_import *imp = req->rq_import;
+ unsigned long flags;
+ struct ptlrpc_bulk_desc *desc;
+
+ LASSERT (type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
+
+ desc = new_bulk();
+ if (desc == NULL)
+ RETURN(NULL);
+
+ /* Is this sampled at the right place? Do we want to get the import
+ * generation just before we send? Should it match the generation of
+ * the request? */
+ spin_lock_irqsave(&imp->imp_lock, flags);
+ desc->bd_import_generation = imp->imp_generation;
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
+
+ desc->bd_import = class_import_get(imp);
+ desc->bd_req = req;
+ desc->bd_type = type;
+ desc->bd_portal = portal;
+
+ /* This makes req own desc, and free it when she frees herself */
+ req->rq_bulk = desc;
+
+ return desc;
}
-struct ptlrpc_bulk_page *ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc)
+struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
+ int type, int portal)
+{
+ struct obd_export *exp = req->rq_export;
+ struct ptlrpc_bulk_desc *desc;
+
+ LASSERT (type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
+
+ desc = new_bulk();
+ if (desc == NULL)
+ RETURN(NULL);
+
+ desc->bd_export = class_export_get(exp);
+ desc->bd_req = req;
+ desc->bd_type = type;
+ desc->bd_portal = portal;
+
+ /* NB we don't assign rq_bulk here; server-side requests are
+ * re-used, and the handler frees the bulk desc explicitly. */
+
+ return desc;
+}
+
+int ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
+ struct page *page, int pageoffset, int len)
{
struct ptlrpc_bulk_page *bulk;
OBD_ALLOC(bulk, sizeof(*bulk));
- if (bulk != NULL) {
- bulk->bp_desc = desc;
- list_add_tail(&bulk->bp_link, &desc->bd_page_list);
- desc->bd_page_count++;
- }
- return bulk;
+ if (bulk == NULL)
+ return (-ENOMEM);
+
+ LASSERT (page != NULL);
+ LASSERT (pageoffset >= 0);
+ LASSERT (len > 0);
+ LASSERT (pageoffset + len <= PAGE_SIZE);
+
+ bulk->bp_page = page;
+ bulk->bp_pageoffset = pageoffset;
+ bulk->bp_buflen = len;
+
+ bulk->bp_desc = desc;
+ list_add_tail(&bulk->bp_link, &desc->bd_page_list);
+ desc->bd_page_count++;
+ return 0;
}
void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
{
struct list_head *tmp, *next;
ENTRY;
- if (desc == NULL) {
- EXIT;
- return;
- }
-
- LASSERT(list_empty(&desc->bd_set_chain));
-
- if (atomic_read(&desc->bd_refcount) != 0)
- CERROR("freeing desc %p with refcount %d!\n", desc,
- atomic_read(&desc->bd_refcount));
+ LASSERT (desc != NULL);
+ LASSERT (desc->bd_page_count != 0x5a5a5a5a); /* not freed already */
+ LASSERT (!desc->bd_network_rw); /* network hands off or */
+
list_for_each_safe(tmp, next, &desc->bd_page_list) {
struct ptlrpc_bulk_page *bulk;
bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
ptlrpc_free_bulk_page(bulk);
}
- ptlrpc_put_connection(desc->bd_connection);
+ LASSERT (desc->bd_page_count == 0);
+ LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
+
+ if (desc->bd_export)
+ class_export_put(desc->bd_export);
+ else
+ class_import_put(desc->bd_import);
OBD_FREE(desc, sizeof(*desc));
EXIT;
void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *bulk)
{
- ENTRY;
- if (bulk == NULL) {
- EXIT;
- return;
- }
-
+ LASSERT (bulk != NULL);
+
list_del(&bulk->bp_link);
bulk->bp_desc->bd_page_count--;
OBD_FREE(bulk, sizeof(*bulk));
- EXIT;
}
-static int ll_sync_brw_timeout(void *data)
+struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
+ int count, int *lengths, char **bufs)
{
- struct obd_brw_set *set = data;
- struct list_head *tmp;
- int failed = 0;
+ struct ptlrpc_request *request;
+ int rc;
ENTRY;
- LASSERT(set);
+ LASSERT((unsigned long)imp > 0x1000);
- set->brw_flags |= PTL_RPC_FL_TIMEOUT;
+ OBD_ALLOC(request, sizeof(*request));
+ if (!request) {
+ CERROR("request allocation out of memory\n");
+ RETURN(NULL);
+ }
- list_for_each(tmp, &set->brw_desc_head) {
- struct ptlrpc_bulk_desc *desc =
- list_entry(tmp, struct ptlrpc_bulk_desc, bd_set_chain);
+ rc = lustre_pack_msg(count, lengths, bufs,
+ &request->rq_reqlen, &request->rq_reqmsg);
+ if (rc) {
+ CERROR("cannot pack request %d\n", rc);
+ OBD_FREE(request, sizeof(*request));
+ RETURN(NULL);
+ }
- /* Skip descriptors that were completed successfully. */
- if (desc->bd_flags & (PTL_BULK_FL_RCVD | PTL_BULK_FL_SENT))
- continue;
+ request->rq_timeout = obd_timeout;
+ request->rq_level = LUSTRE_CONN_FULL;
+ request->rq_type = PTL_RPC_MSG_REQUEST;
+ request->rq_import = class_import_get(imp);
+ request->rq_phase = RQ_PHASE_NEW;
+
+ /* XXX FIXME bug 249 */
+ request->rq_request_portal = imp->imp_client->cli_request_portal;
+ request->rq_reply_portal = imp->imp_client->cli_reply_portal;
- LASSERT(desc->bd_connection);
-
- /* If PtlMDUnlink succeeds, then bulk I/O on the MD hasn't
- * even started yet. XXX where do we kunmup the thing?
- *
- * If it fail with PTL_MD_BUSY, then the network is still
- * reading/writing the buffers and we must wait for it to
- * complete (which it will within finite time, most
- * probably with failure; we really need portals error
- * events to detect that).
- *
- * Otherwise (PTL_INV_MD) it completed after the bd_flags
- * test above!
- */
- if (PtlMDUnlink(desc->bd_md_h) != PTL_OK) {
- CERROR("Near-miss on OST %s -- need to adjust "
- "obd_timeout?\n",
- desc->bd_connection->c_remote_uuid.uuid);
- continue;
- }
+ request->rq_connection = ptlrpc_connection_addref(imp->imp_connection);
- CERROR("IO of %d pages to/from %s:%d (conn %p) timed out\n",
- desc->bd_page_count,
- desc->bd_connection->c_remote_uuid.uuid,
- desc->bd_portal, desc->bd_connection);
+ spin_lock_init (&request->rq_lock);
+ INIT_LIST_HEAD(&request->rq_list);
+ init_waitqueue_head(&request->rq_wait_for_rep);
+ request->rq_xid = ptlrpc_next_xid();
+ atomic_set(&request->rq_refcount, 1);
- /* This one will "never" arrive, don't wait for it. */
- if (atomic_dec_and_test(&set->brw_refcount))
- wake_up(&set->brw_waitq);
+ request->rq_reqmsg->opc = opcode;
+ request->rq_reqmsg->flags = 0;
- if (class_signal_connection_failure)
- class_signal_connection_failure(desc->bd_connection);
- else
- failed = 1;
+ RETURN(request);
+}
+
+struct ptlrpc_request_set *ptlrpc_prep_set(void)
+{
+ struct ptlrpc_request_set *set;
+
+ OBD_ALLOC(set, sizeof *set);
+ if (!set)
+ RETURN(NULL);
+ INIT_LIST_HEAD(&set->set_requests);
+ init_waitqueue_head(&set->set_waitq);
+ set->set_remaining = 0;
+
+ RETURN(set);
+}
+
+/* Finish with this set; opposite of prep_set. */
+void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
+{
+ struct list_head *tmp;
+ struct list_head *next;
+ int expected_phase;
+ int n = 0;
+ ENTRY;
+
+ /* Requests on the set should either all be completed, or all be new */
+ expected_phase = (set->set_remaining == 0) ?
+ RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
+ list_for_each (tmp, &set->set_requests) {
+ struct ptlrpc_request *req =
+ list_entry(tmp, struct ptlrpc_request, rq_set_chain);
+
+ LASSERT (req->rq_phase == expected_phase);
+ n++;
+ }
+
+ LASSERT (set->set_remaining == 0 || set->set_remaining == n);
+
+ list_for_each_safe(tmp, next, &set->set_requests) {
+ struct ptlrpc_request *req =
+ list_entry(tmp, struct ptlrpc_request, rq_set_chain);
+ list_del_init(&req->rq_set_chain);
+
+ LASSERT (req->rq_phase == expected_phase);
+
+ if (req->rq_phase == RQ_PHASE_NEW) {
+
+ if (req->rq_interpret_reply != NULL) {
+ int (*interpreter)(struct ptlrpc_request *, void *, int) =
+ req->rq_interpret_reply;
+
+ /* higher level (i.e. LOV) failed;
+ * let the sub reqs clean up */
+ req->rq_status = -EBADR;
+ interpreter(req, &req->rq_async_args, req->rq_status);
+ }
+ set->set_remaining--;
+ }
+
+ req->rq_set = NULL;
+ ptlrpc_req_finished (req);
}
- /* 0 = We go back to sleep, until we're resumed or interrupted */
- /* 1 = We can't be recovered, just abort the syscall with -ETIMEDOUT */
- RETURN(failed);
+ LASSERT(set->set_remaining == 0);
+
+ OBD_FREE(set, sizeof(*set));
+ EXIT;
}
-static int ll_sync_brw_intr(void *data)
+void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
+ struct ptlrpc_request *req)
{
- struct obd_brw_set *set = data;
+ /* The set takes over the caller's request reference */
+ list_add_tail(&req->rq_set_chain, &set->set_requests);
+ req->rq_set = set;
+ set->set_remaining++;
+}
+static int ptlrpc_check_reply(struct ptlrpc_request *req)
+{
+ unsigned long flags;
+ int rc = 0;
ENTRY;
- set->brw_flags |= PTL_RPC_FL_INTR;
- RETURN(1); /* ignored, as of this writing */
+
+ /* serialise with network callback */
+ spin_lock_irqsave (&req->rq_lock, flags);
+
+ if (req->rq_replied) {
+ DEBUG_REQ(D_NET, req, "REPLIED:");
+ GOTO(out, rc = 1);
+ }
+
+ if (req->rq_err) {
+ DEBUG_REQ(D_ERROR, req, "ABORTED:");
+ GOTO(out, rc = 1);
+ }
+
+ if (req->rq_resend) {
+ DEBUG_REQ(D_ERROR, req, "RESEND:");
+ GOTO(out, rc = 1);
+ }
+
+ if (req->rq_restart) {
+ DEBUG_REQ(D_ERROR, req, "RESTART:");
+ GOTO(out, rc = 1);
+ }
+ EXIT;
+ out:
+ spin_unlock_irqrestore (&req->rq_lock, flags);
+ DEBUG_REQ(D_NET, req, "rc = %d for", rc);
+ return rc;
}
-int ll_brw_sync_wait(struct obd_brw_set *set, int phase)
+static int ptlrpc_check_status(struct ptlrpc_request *req)
{
- struct l_wait_info lwi;
- struct list_head *tmp, *next;
- int rc = 0;
+ int err;
+ ENTRY;
+
+ err = req->rq_repmsg->status;
+ if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
+ DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR (%d)", err);
+ if (err >= 0)
+ CERROR("Error Reply has >= zero status\n");
+ RETURN(err < 0 ? err : -EINVAL);
+ }
+
+ if (err < 0) {
+ DEBUG_REQ(D_INFO, req, "status is %d", err);
+ } else if (err > 0) {
+ /* XXX: translate this error from net to host */
+ DEBUG_REQ(D_INFO, req, "status is %d", err);
+ }
+
+ RETURN(err);
+}
+
+#warning this needs to change after robert fixes eviction handling
+static int
+after_reply(struct ptlrpc_request *req, int *restartp)
+{
+ unsigned long flags;
+ struct obd_import *imp = req->rq_import;
+ int rc;
ENTRY;
- obd_brw_set_addref(set);
- switch(phase) {
- case CB_PHASE_START:
- lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ll_sync_brw_timeout,
- ll_sync_brw_intr, set);
- rc = l_wait_event(set->brw_waitq,
- atomic_read(&set->brw_desc_count) == 0, &lwi);
-
- list_for_each_safe(tmp, next, &set->brw_desc_head) {
- struct ptlrpc_bulk_desc *desc =
- list_entry(tmp, struct ptlrpc_bulk_desc,
- bd_set_chain);
- list_del_init(&desc->bd_set_chain);
- ptlrpc_bulk_decref(desc);
+ LASSERT (!req->rq_receiving_reply);
+ LASSERT (req->rq_replied);
+
+ if (restartp != NULL)
+ *restartp = 0;
+
+ /* NB Until this point, the whole of the incoming message,
+ * including buflens, status etc is in the sender's byte order. */
+
+#if SWAB_PARANOIA
+ /* Clear reply swab mask; this is a new reply in sender's byte order */
+ req->rq_rep_swab_mask = 0;
+#endif
+ rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
+ if (rc) {
+ CERROR("unpack_rep failed: %d\n", rc);
+ RETURN (-EPROTO);
+ }
+
+ if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
+ req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
+ CERROR("invalid packet type received (type=%u)\n",
+ req->rq_repmsg->type);
+ RETURN (-EPROTO);
+ }
+
+ /* Store transno in reqmsg for replay. */
+ req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
+
+ rc = ptlrpc_check_status(req);
+
+ /* Either we've been evicted, or the server has failed for
+ * some reason. Try to reconnect, and if that fails, punt to
+ * upcall */
+ if (rc == -ENOTCONN) {
+ if (req->rq_level < LUSTRE_CONN_FULL || req->rq_no_recov ||
+ imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
+ RETURN(-ENOTCONN);
}
- break;
- case CB_PHASE_FINISH:
- if (atomic_dec_and_test(&set->brw_desc_count))
- wake_up(&set->brw_waitq);
- break;
- default:
+
+ rc = ptlrpc_request_handle_eviction(req);
+ if (rc)
+ CERROR("can't reconnect to %s@%s: %d\n",
+ imp->imp_target_uuid.uuid,
+ imp->imp_connection->c_remote_uuid.uuid, rc);
+ else
+ ptlrpc_wake_delayed(imp);
+
+ if (req->rq_err)
+ RETURN(-EIO);
+
+ if (req->rq_resend) {
+ if (restartp == NULL)
+ LBUG(); /* async resend not supported yet */
+ spin_lock_irqsave (&req->rq_lock, flags);
+ req->rq_resend = 0;
+ spin_unlock_irqrestore (&req->rq_lock, flags);
+ *restartp = 1;
+ lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
+ DEBUG_REQ(D_HA, req, "resending: ");
+ RETURN (0);
+ }
+
+ CERROR("request should be err or resend: %p\n", req);
LBUG();
}
- obd_brw_set_decref(set);
+ if (req->rq_import->imp_replayable) {
+ spin_lock_irqsave(&imp->imp_lock, flags);
+ if ((req->rq_replay || req->rq_transno != 0) && rc >= 0)
+ ptlrpc_retain_replayable_request(req, imp);
+
+ if (req->rq_transno > imp->imp_max_transno)
+ imp->imp_max_transno = req->rq_transno;
+
+ /* Replay-enabled imports return commit-status information. */
+ if (req->rq_repmsg->last_committed) {
+ if (req->rq_repmsg->last_committed <
+ imp->imp_peer_committed_transno) {
+ CERROR("%s went back in time (transno "LPD64
+ " was committed, server claims "LPD64
+ ")! is shared storage not coherent?\n",
+ imp->imp_target_uuid.uuid,
+ imp->imp_peer_committed_transno,
+ req->rq_repmsg->last_committed);
+ }
+ imp->imp_peer_committed_transno =
+ req->rq_repmsg->last_committed;
+ }
+ ptlrpc_free_committed(imp);
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
+ }
+
RETURN(rc);
}
-struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
- int count, int *lengths, char **bufs)
+static int check_set(struct ptlrpc_request_set *set)
{
- struct ptlrpc_connection *conn;
- struct ptlrpc_request *request;
- int rc;
+ unsigned long flags;
+ struct list_head *tmp;
+ ENTRY;
+
+ if (set->set_remaining == 0)
+ RETURN(1);
+
+ list_for_each(tmp, &set->set_requests) {
+ struct ptlrpc_request *req =
+ list_entry(tmp, struct ptlrpc_request, rq_set_chain);
+ struct obd_import *imp = req->rq_import;
+ int rc = 0;
+
+ LASSERT (req->rq_phase == RQ_PHASE_RPC ||
+ req->rq_phase == RQ_PHASE_BULK ||
+ req->rq_phase == RQ_PHASE_COMPLETE);
+
+ if (req->rq_phase == RQ_PHASE_COMPLETE)
+ continue;
+
+ if (req->rq_err) {
+ ptlrpc_unregister_reply(req);
+ if (req->rq_status == 0)
+ req->rq_status = -EIO;
+ req->rq_phase = RQ_PHASE_INTERPRET;
+
+ spin_lock_irqsave(&imp->imp_lock, flags);
+ list_del_init(&req->rq_list);
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
+
+ GOTO (interpret, req->rq_status);
+ }
+
+ if (req->rq_intr) {
+ /* NB could be on delayed list */
+ ptlrpc_unregister_reply(req);
+ req->rq_status = -EINTR;
+ req->rq_phase = RQ_PHASE_INTERPRET;
+
+ spin_lock_irqsave(&imp->imp_lock, flags);
+ list_del_init(&req->rq_list);
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
+
+ GOTO (interpret, req->rq_status);
+ }
+
+ if (req->rq_phase == RQ_PHASE_RPC) {
+ int do_restart = 0;
+ if (req->rq_waiting || req->rq_resend) {
+ spin_lock_irqsave(&imp->imp_lock, flags);
+
+ if (req->rq_level > imp->imp_level) {
+ spin_unlock_irqrestore(&imp->imp_lock,
+ flags);
+ continue;
+ }
+
+ list_del(&req->rq_list);
+ list_add_tail(&req->rq_list,
+ &imp->imp_sending_list);
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
+
+ req->rq_waiting = 0;
+ if (req->rq_resend) {
+ lustre_msg_add_flags(req->rq_reqmsg,
+ MSG_RESENT);
+ spin_lock_irqsave(&req->rq_lock, flags);
+ req->rq_resend = 0;
+ spin_unlock_irqrestore(&req->rq_lock,
+ flags);
+ ptlrpc_unregister_reply(req);
+ if (req->rq_bulk)
+ ptlrpc_unregister_bulk(req);
+ }
+
+ rc = ptl_send_rpc(req);
+ if (rc) {
+ req->rq_status = rc;
+ req->rq_phase = RQ_PHASE_INTERPRET;
+ GOTO (interpret, req->rq_status);
+ }
+
+ }
+
+ /* Ensure the network callback returned */
+ spin_lock_irqsave (&req->rq_lock, flags);
+ if (!req->rq_replied) {
+ spin_unlock_irqrestore (&req->rq_lock, flags);
+ continue;
+ }
+ spin_unlock_irqrestore (&req->rq_lock, flags);
+
+ spin_lock_irqsave(&imp->imp_lock, flags);
+ list_del_init(&req->rq_list);
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
+
+ req->rq_status = after_reply(req, &do_restart);
+ if (do_restart) {
+ req->rq_resend = 1; /* ugh */
+ continue;
+ }
+
+ if (req->rq_bulk == NULL) {
+ req->rq_phase = RQ_PHASE_INTERPRET;
+ GOTO (interpret, req->rq_status);
+ }
+
+ req->rq_phase = RQ_PHASE_BULK;
+ }
+
+ LASSERT (req->rq_phase == RQ_PHASE_BULK);
+ if (!ptlrpc_bulk_complete (req->rq_bulk))
+ continue;
+
+ req->rq_phase = RQ_PHASE_INTERPRET;
+
+ interpret:
+ LASSERT (req->rq_phase == RQ_PHASE_INTERPRET);
+ LASSERT (!req->rq_receiving_reply);
+
+ if (req->rq_bulk != NULL)
+ ptlrpc_unregister_bulk (req);
+
+ if (req->rq_interpret_reply != NULL) {
+ int (*interpreter)(struct ptlrpc_request *, void *, int) =
+ req->rq_interpret_reply;
+ req->rq_status = interpreter(req, &req->rq_async_args,
+ req->rq_status);
+ }
+
+ CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
+ "opc %s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
+ imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
+ req->rq_xid,
+ imp->imp_connection->c_peer.peer_ni->pni_name,
+ imp->imp_connection->c_peer.peer_nid,
+ req->rq_reqmsg->opc);
+
+ req->rq_phase = RQ_PHASE_COMPLETE;
+ set->set_remaining--;
+ }
+
+ RETURN (set->set_remaining == 0);
+}
+
+static int expire_one_request(struct ptlrpc_request *req)
+{
+ unsigned long flags;
+ struct obd_import *imp = req->rq_import;
+ ENTRY;
+
+ DEBUG_REQ(D_ERROR, req, "timeout");
+
+ spin_lock_irqsave (&req->rq_lock, flags);
+ req->rq_timedout = 1;
+ spin_unlock_irqrestore (&req->rq_lock, flags);
+
+ ptlrpc_unregister_reply (req);
+
+ if (imp == NULL) {
+ DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
+ RETURN(1);
+ }
+
+ /* The DLM server doesn't want recovery run on its imports. */
+ if (imp->imp_dlm_fake)
+ RETURN(1);
+
+ /* If this request is for recovery or other primordial tasks,
+ * don't go back to sleep, and don't start recovery again.. */
+ if (req->rq_level < LUSTRE_CONN_FULL || req->rq_no_recov ||
+ imp->imp_obd->obd_no_recov)
+ RETURN(1);
+
+ ptlrpc_fail_import(imp, req->rq_import_generation);
+
+ RETURN(0);
+}
+
+static int expired_set(void *data)
+{
+ struct ptlrpc_request_set *set = data;
+ struct list_head *tmp;
+ time_t now = LTIME_S (CURRENT_TIME);
ENTRY;
- LASSERT((unsigned long)imp > 0x1000);
- conn = imp->imp_connection;
+ LASSERT (set != NULL);
+ CERROR("EXPIRED SET %p\n", set);
+
+ /* A timeout expired; see which reqs it applies to... */
+ list_for_each (tmp, &set->set_requests) {
+ struct ptlrpc_request *req =
+ list_entry(tmp, struct ptlrpc_request, rq_set_chain);
+
+ /* request in-flight? */
+ if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
+ (req->rq_phase == RQ_PHASE_BULK)))
+ continue;
+
+ if (req->rq_timedout || /* already dealt with */
+ req->rq_sent + req->rq_timeout > now) /* not expired */
+ continue;
+
+ /* deal with this guy */
+ expire_one_request (req);
+ }
+
+ /* When waiting for a whole set, we always to break out of the
+ * sleep so we can recalculate the timeout, or enable interrupts
+ * iff everyone's timed out.
+ */
+ RETURN(1);
+}
+
+static void interrupted_set(void *data)
+{
+ struct ptlrpc_request_set *set = data;
+ struct list_head *tmp;
+ unsigned long flags;
+
+ LASSERT (set != NULL);
+ CERROR("INTERRUPTED SET %p\n", set);
+
+ list_for_each(tmp, &set->set_requests) {
+ struct ptlrpc_request *req =
+ list_entry(tmp, struct ptlrpc_request, rq_set_chain);
+
+ if (req->rq_phase != RQ_PHASE_RPC)
+ continue;
+
+ spin_lock_irqsave (&req->rq_lock, flags);
+ req->rq_intr = 1;
+ spin_unlock_irqrestore (&req->rq_lock, flags);
+ }
+}
+
+int ptlrpc_set_wait(struct ptlrpc_request_set *set)
+{
+ struct list_head *tmp;
+ struct obd_import *imp;
+ struct ptlrpc_request *req;
+ struct l_wait_info lwi;
+ unsigned long flags;
+ int rc;
+ time_t now;
+ time_t deadline;
+ int timeout;
+ ENTRY;
+
+ list_for_each(tmp, &set->set_requests) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
+
+ LASSERT (req->rq_level == LUSTRE_CONN_FULL);
+ LASSERT (req->rq_phase == RQ_PHASE_NEW);
+ req->rq_phase = RQ_PHASE_RPC;
+
+ imp = req->rq_import;
+ spin_lock_irqsave(&imp->imp_lock, flags);
+
+ if (imp->imp_invalid) {
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
+ req->rq_status = -EIO;
+ req->rq_phase = RQ_PHASE_INTERPRET;
+ continue;
+ }
- OBD_ALLOC(request, sizeof(*request));
- if (!request) {
- CERROR("request allocation out of memory\n");
- RETURN(NULL);
- }
+ if (req->rq_level > imp->imp_level) {
+ if (req->rq_no_recov || imp->imp_obd->obd_no_recov ||
+ imp->imp_dlm_fake) {
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
+ req->rq_status = -EWOULDBLOCK;
+ req->rq_phase = RQ_PHASE_INTERPRET;
+ continue;
+ }
+
+ spin_lock (&req->rq_lock);
+ req->rq_waiting = 1;
+ spin_unlock (&req->rq_lock);
+ LASSERT (list_empty (&req->rq_list));
+ // list_del(&req->rq_list);
+ list_add_tail(&req->rq_list, &imp->imp_delayed_list);
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
+ continue;
+ }
- rc = lustre_pack_msg(count, lengths, bufs,
- &request->rq_reqlen, &request->rq_reqmsg);
- if (rc) {
- CERROR("cannot pack request %d\n", rc);
- OBD_FREE(request, sizeof(*request));
- RETURN(NULL);
- }
+ /* XXX this is the same as ptlrpc_queue_wait */
+ LASSERT(list_empty(&req->rq_list));
+ list_add_tail(&req->rq_list, &imp->imp_sending_list);
+ req->rq_import_generation = imp->imp_generation;
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
- request->rq_timeout = obd_timeout;
- request->rq_level = LUSTRE_CONN_FULL;
- request->rq_type = PTL_RPC_MSG_REQUEST;
- request->rq_import = imp;
+ CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
+ " %s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
+ imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
+ req->rq_xid,
+ imp->imp_connection->c_peer.peer_ni->pni_name,
+ imp->imp_connection->c_peer.peer_nid,
+ req->rq_reqmsg->opc);
- /* XXX FIXME bug 625069, now 249 */
- request->rq_request_portal = imp->imp_client->cli_request_portal;
- request->rq_reply_portal = imp->imp_client->cli_reply_portal;
+ rc = ptl_send_rpc(req);
+ if (rc) {
+ req->rq_status = rc;
+ req->rq_phase = RQ_PHASE_INTERPRET;
+ }
+ }
- request->rq_connection = ptlrpc_connection_addref(conn);
+ do {
+ now = LTIME_S (CURRENT_TIME);
+ timeout = 0;
+ list_for_each (tmp, &set->set_requests) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
- INIT_LIST_HEAD(&request->rq_list);
- atomic_set(&request->rq_refcount, 1);
+ /* request in-flight? */
+ if (!((req->rq_phase == RQ_PHASE_RPC &&
+ !req->rq_waiting) ||
+ (req->rq_phase == RQ_PHASE_BULK)))
+ continue;
- request->rq_reqmsg->magic = PTLRPC_MSG_MAGIC;
- request->rq_reqmsg->version = PTLRPC_MSG_VERSION;
- request->rq_reqmsg->opc = HTON__u32(opcode);
- request->rq_reqmsg->flags = 0;
+ if (req->rq_timedout) /* already timed out */
+ continue;
+
+ deadline = req->rq_sent + req->rq_timeout;
+ if (deadline <= now) /* actually expired already */
+ timeout = 1; /* ASAP */
+ else if (timeout == 0 || timeout > deadline - now)
+ timeout = deadline - now;
+ }
- ptlrpc_hdl2req(request, &imp->imp_handle);
- RETURN(request);
+ /* wait until all complete, interrupted, or an in-flight
+ * req times out */
+ CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
+ set, timeout);
+ lwi = LWI_TIMEOUT_INTR(timeout * HZ,
+ expired_set, interrupted_set, set);
+ rc = l_wait_event(set->set_waitq, check_set(set), &lwi);
+
+ LASSERT (rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
+
+ /* -EINTR => all requests have been flagged rq_intr so next
+ * check completes.
+ * -ETIMEOUTD => someone timed out. When all reqs have
+ * timed out, signals are enabled allowing completion with
+ * EINTR.
+ * I don't really care if we go once more round the loop in
+ * the error cases -eeb. */
+ } while (rc != 0);
+
+ LASSERT (set->set_remaining == 0);
+
+ rc = 0;
+ list_for_each(tmp, &set->set_requests) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
+
+ LASSERT (req->rq_phase == RQ_PHASE_COMPLETE);
+ if (req->rq_status != 0)
+ rc = req->rq_status;
+ }
+
+ if (set->set_interpret != NULL) {
+ int (*interpreter)(struct ptlrpc_request_set *set, void *, int) =
+ set->set_interpret;
+ rc = interpreter (set, &set->set_args, rc);
+ }
+
+ RETURN(rc);
}
static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
return;
}
+ LASSERT (!request->rq_receiving_reply);
+
/* We must take it off the imp_replay_list first. Otherwise, we'll set
* request->rq_reqmsg to NULL while osc_close is dereferencing it. */
- if (request->rq_import) {
+ if (request->rq_import != NULL) {
unsigned long flags = 0;
if (!locked)
spin_lock_irqsave(&request->rq_import->imp_lock, flags);
}
if (atomic_read(&request->rq_refcount) != 0) {
- CERROR("freeing request %p (%d->%s:%d) with refcount %d\n",
- request, request->rq_reqmsg->opc,
- request->rq_connection->c_remote_uuid.uuid,
- request->rq_import->imp_client->cli_request_portal,
- atomic_read (&request->rq_refcount));
+ DEBUG_REQ(D_ERROR, request,
+ "freeing request with nonzero refcount");
LBUG();
}
if (request->rq_repmsg != NULL) {
OBD_FREE(request->rq_repmsg, request->rq_replen);
request->rq_repmsg = NULL;
- request->rq_reply_md.start = NULL;
}
if (request->rq_reqmsg != NULL) {
OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
request->rq_reqmsg = NULL;
}
+ if (request->rq_export != NULL) {
+ class_export_put(request->rq_export);
+ request->rq_export = NULL;
+ }
+ if (request->rq_import != NULL) {
+ class_import_put(request->rq_import);
+ request->rq_import = NULL;
+ }
+ if (request->rq_bulk != NULL)
+ ptlrpc_free_bulk(request->rq_bulk);
ptlrpc_put_connection(request->rq_connection);
OBD_FREE(request, sizeof(*request));
__ptlrpc_req_finished(request, 0);
}
-static int ptlrpc_check_reply(struct ptlrpc_request *req)
+static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
{
- int rc = 0;
-
- ENTRY;
- if (req->rq_repmsg != NULL) {
- req->rq_transno = NTOH__u64(req->rq_repmsg->transno);
- /* Store transno in reqmsg for replay. */
- req->rq_reqmsg->transno = req->rq_repmsg->transno;
- req->rq_flags |= PTL_RPC_FL_REPLIED;
- GOTO(out, rc = 1);
- }
-
- if (req->rq_flags & PTL_RPC_FL_RESEND) {
- DEBUG_REQ(D_ERROR, req, "RESEND:");
- GOTO(out, rc = 1);
- }
-
- if (req->rq_flags & PTL_RPC_FL_ERR) {
- ENTRY;
- DEBUG_REQ(D_ERROR, req, "ABORTED:");
- GOTO(out, rc = 1);
- }
-
- if (req->rq_flags & PTL_RPC_FL_RESTART) {
- DEBUG_REQ(D_ERROR, req, "RESTART:");
- GOTO(out, rc = 1);
- }
- EXIT;
- out:
- DEBUG_REQ(D_NET, req, "rc = %d for", rc);
- return rc;
+ OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
+ request->rq_reqmsg = NULL;
+ request->rq_reqlen = 0;
}
-static int ptlrpc_check_status(struct ptlrpc_request *req)
+/* Disengage the client's reply buffer from the network
+ * NB does _NOT_ unregister any client-side bulk.
+ * IDEMPOTENT, but _not_ safe against concurrent callers.
+ * The request owner (i.e. the thread doing the I/O) must call...
+ */
+void ptlrpc_unregister_reply (struct ptlrpc_request *request)
{
- int err;
+ unsigned long flags;
+ int rc;
ENTRY;
- err = req->rq_repmsg->status;
- if (req->rq_repmsg->type == NTOH__u32(PTL_RPC_MSG_ERR)) {
- DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR (%d)", err);
- RETURN(err ? err : -EINVAL);
- }
+ LASSERT (!in_interrupt ()); /* might sleep */
- if (err < 0) {
- DEBUG_REQ(D_INFO, req, "status is %d", err);
- } else if (err > 0) {
- /* XXX: translate this error from net to host */
- DEBUG_REQ(D_INFO, req, "status is %d", err);
+ spin_lock_irqsave (&request->rq_lock, flags);
+ if (!request->rq_receiving_reply) { /* not waiting for a reply */
+ spin_unlock_irqrestore (&request->rq_lock, flags);
+ EXIT;
+ /* NB reply buffer not freed here */
+ return;
}
- RETURN(err);
-}
-
-static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
-{
- OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
- request->rq_reqmsg = NULL;
- request->rq_reqlen = 0;
-}
-
-/* Abort this request and cleanup any resources associated with it. */
-int ptlrpc_abort(struct ptlrpc_request *request)
-{
- /* First remove the ME for the reply; in theory, this means
- * that we can tear down the buffer safely. */
- if (PtlMEUnlink(request->rq_reply_me_h) != PTL_OK)
- RETURN(0);
- OBD_FREE(request->rq_reply_md.start, request->rq_replen);
+ LASSERT (!request->rq_replied); /* callback hasn't completed */
+ spin_unlock_irqrestore (&request->rq_lock, flags);
+
+ rc = PtlMDUnlink (request->rq_reply_md_h);
+ switch (rc) {
+ default:
+ LBUG ();
+
+ case PTL_OK: /* unlinked before completion */
+ LASSERT (request->rq_receiving_reply);
+ LASSERT (!request->rq_replied);
+ spin_lock_irqsave (&request->rq_lock, flags);
+ request->rq_receiving_reply = 0;
+ spin_unlock_irqrestore (&request->rq_lock, flags);
+ OBD_FREE(request->rq_repmsg, request->rq_replen);
+ request->rq_repmsg = NULL;
+ EXIT;
+ return;
+
+ case PTL_MD_INUSE: /* callback in progress */
+ for (;;) {
+ /* Network access will complete in finite time but
+ * the timeout lets us CERROR for visibility */
+ struct l_wait_info lwi = LWI_TIMEOUT (10 * HZ, NULL, NULL);
+
+ rc = l_wait_event (request->rq_wait_for_rep,
+ request->rq_replied, &lwi);
+ LASSERT (rc == 0 || rc == -ETIMEDOUT);
+ if (rc == 0) {
+ spin_lock_irqsave (&request->rq_lock, flags);
+ /* Ensure the callback has completed scheduling me
+ * and taken its hands off the request */
+ spin_unlock_irqrestore (&request->rq_lock, flags);
+ break;
+ }
+
+ CERROR ("Unexpectedly long timeout: req %p\n", request);
+ }
+ /* fall through */
- memset(&request->rq_reply_me_h, 0, sizeof(request->rq_reply_me_h));
- request->rq_reply_md.start = NULL;
- request->rq_repmsg = NULL;
- return 0;
+ case PTL_INV_MD: /* callback completed */
+ LASSERT (!request->rq_receiving_reply);
+ LASSERT (request->rq_replied);
+ EXIT;
+ return;
+ }
+ /* Not Reached */
}
/* caller must hold imp->imp_lock */
{
struct list_head *tmp, *saved;
struct ptlrpc_request *req;
+ struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
ENTRY;
LASSERT(imp != NULL);
list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
- if (req->rq_flags & PTL_RPC_FL_REPLAY) {
+ /* XXX ok to remove when 1357 resolved - rread 05/29/03 */
+ LASSERT (req != last_req);
+ last_req = req;
+
+ if (req->rq_replay) {
DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
continue;
}
void ptlrpc_cleanup_client(struct obd_import *imp)
{
- struct list_head *tmp, *saved;
- struct ptlrpc_request *req;
- struct ptlrpc_connection *conn = imp->imp_connection;
- unsigned long flags;
ENTRY;
-
- LASSERT(conn);
-
- spin_lock_irqsave(&imp->imp_lock, flags);
- list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
- req = list_entry(tmp, struct ptlrpc_request, rq_list);
-
- /* XXX we should make sure that nobody's sleeping on these! */
- DEBUG_REQ(D_HA, req, "cleaning up from sending list");
- list_del_init(&req->rq_list);
- req->rq_import = NULL;
- __ptlrpc_req_finished(req, 0);
- }
- spin_unlock_irqrestore(&imp->imp_lock, flags);
-
EXIT;
return;
}
-void ptlrpc_continue_req(struct ptlrpc_request *req)
-{
- DEBUG_REQ(D_HA, req, "continuing delayed request");
- req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
- req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
- wake_up(&req->rq_wait_for_rep);
-}
-
void ptlrpc_resend_req(struct ptlrpc_request *req)
{
+ unsigned long flags;
+
DEBUG_REQ(D_HA, req, "resending");
- req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
- req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
+ req->rq_reqmsg->handle.cookie = 0;
+ ptlrpc_put_connection(req->rq_connection);
+ req->rq_connection =
+ ptlrpc_connection_addref(req->rq_import->imp_connection);
req->rq_status = -EAGAIN;
- req->rq_level = LUSTRE_CONN_RECOVD;
- req->rq_flags |= PTL_RPC_FL_RESEND;
- req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
- wake_up(&req->rq_wait_for_rep);
+
+ spin_lock_irqsave (&req->rq_lock, flags);
+ req->rq_resend = 1;
+ req->rq_timedout = 0;
+ if (req->rq_set != NULL)
+ wake_up (&req->rq_set->set_waitq);
+ else
+ wake_up(&req->rq_wait_for_rep);
+ spin_unlock_irqrestore (&req->rq_lock, flags);
}
+/* XXX: this function and rq_status are currently unused */
void ptlrpc_restart_req(struct ptlrpc_request *req)
{
+ unsigned long flags;
+
DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
req->rq_status = -ERESTARTSYS;
- req->rq_flags |= PTL_RPC_FL_RESTART;
- req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
- wake_up(&req->rq_wait_for_rep);
+
+ spin_lock_irqsave (&req->rq_lock, flags);
+ req->rq_restart = 1;
+ req->rq_timedout = 0;
+ if (req->rq_set != NULL)
+ wake_up (&req->rq_set->set_waitq);
+ else
+ wake_up(&req->rq_wait_for_rep);
+ spin_unlock_irqrestore (&req->rq_lock, flags);
}
static int expired_request(void *data)
{
struct ptlrpc_request *req = data;
-
ENTRY;
- if (!req) {
- CERROR("NULL req!");
- LBUG();
- RETURN(0);
- }
-
- DEBUG_REQ(D_ERROR, req, "timeout");
- ptlrpc_abort(req);
- req->rq_flags |= PTL_RPC_FL_TIMEOUT;
-
- if (!req->rq_import) {
- DEBUG_REQ(D_HA, req, "NULL import; already cleaned up?");
- RETURN(1);
- }
-
- if (!req->rq_import->imp_connection) {
- DEBUG_REQ(D_ERROR, req, "NULL connection");
- LBUG();
- RETURN(0);
- }
-
- if (!req->rq_import->imp_connection->c_recovd_data.rd_recovd)
- RETURN(1);
-
- recovd_conn_fail(req->rq_import->imp_connection);
- /* If this request is for recovery or other primordial tasks,
- * don't go back to sleep.
- */
- if (req->rq_level < LUSTRE_CONN_FULL)
- RETURN(1);
- RETURN(0);
+ RETURN(expire_one_request(req));
}
-static int interrupted_request(void *data)
+static void interrupted_request(void *data)
{
+ unsigned long flags;
+
struct ptlrpc_request *req = data;
- ENTRY;
- req->rq_flags |= PTL_RPC_FL_INTR;
- RETURN(1); /* ignored, as of this writing */
+ DEBUG_REQ(D_HA, req, "request interrupted");
+ spin_lock_irqsave (&req->rq_lock, flags);
+ req->rq_intr = 1;
+ spin_unlock_irqrestore (&req->rq_lock, flags);
}
struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
LASSERT(spin_is_locked(&imp->imp_lock));
#endif
- LASSERT(imp->imp_flags & IMP_REPLAYABLE);
+ LASSERT(imp->imp_replayable);
/* Balanced in ptlrpc_free_committed, usually. */
ptlrpc_request_addref(req);
list_for_each_prev(tmp, &imp->imp_replay_list) {
* open a file, or for closes retained if to match creating
* opens, so use req->rq_xid as a secondary key.
* (See bugs 684, 685, and 428.)
+ * XXX no longer needed, but all opens need transnos!
*/
if (iter->rq_transno > req->rq_transno)
continue;
int ptlrpc_queue_wait(struct ptlrpc_request *req)
{
int rc = 0;
+ int brc;
struct l_wait_info lwi;
struct obd_import *imp = req->rq_import;
+ struct obd_device *obd = imp->imp_obd;
struct ptlrpc_connection *conn = imp->imp_connection;
unsigned int flags;
+ int do_restart = 0;
+ int timeout = 0;
ENTRY;
- init_waitqueue_head(&req->rq_wait_for_rep);
-
- req->rq_xid = HTON__u32(ptlrpc_next_xid());
-
+ LASSERT (req->rq_set == NULL);
+ LASSERT (!req->rq_receiving_reply);
+
/* for distributed debugging */
- req->rq_reqmsg->status = HTON__u32(current->pid);
- CDEBUG(D_RPCTRACE, "Sending RPC pid:xid:nid:opc %d:"LPU64":%s:"LPX64
- ":%d\n", NTOH__u32(req->rq_reqmsg->status), req->rq_xid,
+ req->rq_reqmsg->status = current->pid;
+ LASSERT(imp->imp_obd != NULL);
+ CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
+ "%s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
+ imp->imp_obd->obd_uuid.uuid,
+ req->rq_reqmsg->status, req->rq_xid,
conn->c_peer.peer_ni->pni_name, conn->c_peer.peer_nid,
- NTOH__u32(req->rq_reqmsg->opc));
-
- spin_lock_irqsave(&imp->imp_lock, flags);
+ req->rq_reqmsg->opc);
+ /* Mark phase here for a little debug help */
+ req->rq_phase = RQ_PHASE_RPC;
+
+restart:
/*
* If the import has been invalidated (such as by an OST failure), the
- * request must fail with -EIO.
+ * request must fail with -EIO. Recovery requests are allowed to go
+ * through, though, so that they have a chance to revalidate the
+ * import.
*/
- if (req->rq_import->imp_flags & IMP_INVALID) {
+ spin_lock_irqsave(&imp->imp_lock, flags);
+ if (req->rq_import->imp_invalid && req->rq_level == LUSTRE_CONN_FULL) {
DEBUG_REQ(D_ERROR, req, "IMP_INVALID:");
spin_unlock_irqrestore(&imp->imp_lock, flags);
- RETURN(-EIO);
+ GOTO (out, rc = -EIO);
}
if (req->rq_level > imp->imp_level) {
list_del(&req->rq_list);
+ if (req->rq_no_recov || obd->obd_no_recov ||
+ imp->imp_dlm_fake) {
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
+ GOTO (out, rc = -EWOULDBLOCK);
+ }
+
list_add_tail(&req->rq_list, &imp->imp_delayed_list);
spin_unlock_irqrestore(&imp->imp_lock, flags);
- DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%d < %d)",
+ DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%d > %d)",
current->comm, req->rq_level, imp->imp_level);
lwi = LWI_INTR(NULL, NULL);
rc = l_wait_event(req->rq_wait_for_rep,
- (req->rq_level <= imp->imp_level) ||
- (req->rq_flags & PTL_RPC_FL_ERR), &lwi);
-
- if (req->rq_flags & PTL_RPC_FL_ERR)
- rc = -EIO;
-
- if (!req->rq_import)
- RETURN(rc);
+ (req->rq_level <= imp->imp_level ||
+ req->rq_err),
+ &lwi);
+ DEBUG_REQ(D_HA, req, "\"%s\" awake: (%d > %d)",
+ current->comm, req->rq_level, imp->imp_level);
spin_lock_irqsave(&imp->imp_lock, flags);
list_del_init(&req->rq_list);
+ if (req->rq_err)
+ rc = -EIO;
+
if (rc) {
spin_unlock_irqrestore(&imp->imp_lock, flags);
- RETURN(rc);
+ GOTO (out, rc);
}
-
+
CERROR("process %d resumed\n", current->pid);
}
- resend:
+ /* XXX this is the same as ptlrpc_set_wait */
LASSERT(list_empty(&req->rq_list));
list_add_tail(&req->rq_list, &imp->imp_sending_list);
+ req->rq_import_generation = imp->imp_generation;
spin_unlock_irqrestore(&imp->imp_lock, flags);
+
rc = ptl_send_rpc(req);
if (rc) {
- CDEBUG(D_HA, "error %d, opcode %d, need recovery\n", rc,
- req->rq_reqmsg->opc);
- /* sleep for a jiffy, then trigger recovery */
- lwi = LWI_TIMEOUT_INTR(1, expired_request,
- interrupted_request, req);
+ /* The DLM's fake imports want to avoid all forms of
+ * recovery. */
+ if (imp->imp_dlm_fake) {
+ spin_lock_irqsave(&imp->imp_lock, flags);
+ list_del_init(&req->rq_list);
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
+ GOTO(out, rc);
+ }
+
+ DEBUG_REQ(D_ERROR, req, "send failed (%d); recovering", rc);
+
+ ptlrpc_fail_import(imp, req->rq_import_generation);
+
+ /* If we've been told to not wait, we're done. */
+ if (req->rq_level < LUSTRE_CONN_FULL || req->rq_no_recov ||
+ obd->obd_no_recov) {
+ spin_lock_irqsave(&imp->imp_lock, flags);
+ list_del_init(&req->rq_list);
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
+ GOTO(out, rc);
+ }
+
+ /* If we errored, allow the user to interrupt immediately */
+ timeout = 1;
} else {
+ timeout = req->rq_timeout * HZ;
DEBUG_REQ(D_NET, req, "-- sleeping");
- lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request,
- interrupted_request, req);
}
#ifdef __KERNEL__
+ lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
+ req);
l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
-#else
- {
+#else
+ {
extern int reply_in_callback(ptl_event_t *ev);
ptl_event_t reply_ev;
- PtlEQWait(req->rq_connection->c_peer.peer_ni->pni_reply_in_eq_h, &reply_ev);
- reply_in_callback(&reply_ev);
+ PtlEQWait(req->rq_connection->c_peer.peer_ni->pni_reply_in_eq_h,
+ &reply_ev);
+ reply_in_callback(&reply_ev);
+
+ LASSERT (reply_ev.mem_desc.user_ptr == (void *)req);
+ // ptlrpc_check_reply(req);
+ // not required now it only tests
}
-#endif
+#endif
DEBUG_REQ(D_NET, req, "-- done sleeping");
+ CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
+ "%s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
+ imp->imp_obd->obd_uuid.uuid,
+ req->rq_reqmsg->status, req->rq_xid,
+ conn->c_peer.peer_ni->pni_name, conn->c_peer.peer_nid,
+ req->rq_reqmsg->opc);
+
spin_lock_irqsave(&imp->imp_lock, flags);
list_del_init(&req->rq_list);
spin_unlock_irqrestore(&imp->imp_lock, flags);
- if (req->rq_flags & PTL_RPC_FL_ERR) {
- ptlrpc_abort(req);
+ /* If the reply was received normally, this just grabs the spinlock
+ * (ensuring the reply callback has returned), sees that
+ * req->rq_receiving_reply is clear and returns. */
+ ptlrpc_unregister_reply (req);
+
+ if (req->rq_err)
GOTO(out, rc = -EIO);
- }
- /* Don't resend if we were interrupted. */
- if ((req->rq_flags & (PTL_RPC_FL_RESEND | PTL_RPC_FL_INTR)) ==
- PTL_RPC_FL_RESEND) {
- if (req->rq_flags & PTL_RPC_FL_NO_RESEND) {
- ptlrpc_abort(req); /* clean up reply buffers */
- req->rq_flags &= ~PTL_RPC_FL_NO_RESEND;
+ /* Resend if we need to, unless we were interrupted. */
+ if (req->rq_resend && !req->rq_intr) {
+ /* ...unless we were specifically told otherwise. */
+ if (req->rq_no_resend) {
+ spin_lock_irqsave (&req->rq_lock, flags);
+ req->rq_no_resend = 0;
+ spin_unlock_irqrestore (&req->rq_lock, flags);
GOTO(out, rc = -ETIMEDOUT);
}
- req->rq_flags &= ~PTL_RPC_FL_RESEND;
+ spin_lock_irqsave (&req->rq_lock, flags);
+ req->rq_resend = 0;
+ spin_unlock_irqrestore (&req->rq_lock, flags);
lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
+
+ if (req->rq_bulk != NULL)
+ ptlrpc_unregister_bulk (req);
+
DEBUG_REQ(D_HA, req, "resending: ");
- spin_lock_irqsave(&imp->imp_lock, flags);
- goto resend;
+ goto restart;
}
- if (req->rq_flags & PTL_RPC_FL_INTR) {
- if (!(req->rq_flags & PTL_RPC_FL_TIMEOUT))
- LBUG(); /* should only be interrupted if we timed out */
- /* Clean up the dangling reply buffers */
- ptlrpc_abort(req);
+ if (req->rq_intr) {
+ /* Should only be interrupted if we timed out. */
+ if (!req->rq_timedout)
+ DEBUG_REQ(D_ERROR, req,
+ "rq_intr set but rq_timedout not");
GOTO(out, rc = -EINTR);
}
- if (req->rq_flags & PTL_RPC_FL_TIMEOUT)
+ if (req->rq_timedout) { /* non-recoverable timeout */
GOTO(out, rc = -ETIMEDOUT);
-
- if (!(req->rq_flags & PTL_RPC_FL_REPLIED))
- GOTO(out, rc = req->rq_status);
-
- rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
- if (rc) {
- CERROR("unpack_rep failed: %d\n", rc);
- GOTO(out, rc);
}
-#if 0
- /* FIXME: Enable when BlueArc makes new release */
- if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
- req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
- CERROR("invalid packet type received (type=%u)\n",
- req->rq_repmsg->type);
+
+ if (!req->rq_replied) {
+ /* How can this be? -eeb */
+ DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
LBUG();
- GOTO(out, rc = -EINVAL);
+ GOTO(out, rc = req->rq_status);
}
-#endif
- DEBUG_REQ(D_NET, req, "status %d", req->rq_repmsg->status);
- /* We're a rejected connection, need to invalidate and rebuild. */
- if (req->rq_repmsg->status == -ENOTCONN) {
- spin_lock_irqsave(&imp->imp_lock, flags);
- /* If someone else is reconnecting us (CONN_RECOVD) or has
- * already completed it (handle mismatch), then we just need
- * to get out.
- */
- if (imp->imp_level == LUSTRE_CONN_RECOVD ||
- imp->imp_handle.addr != req->rq_reqmsg->addr ||
- imp->imp_handle.cookie != req->rq_reqmsg->cookie) {
- spin_unlock_irqrestore(&imp->imp_lock, flags);
- GOTO(out, rc = -EIO);
- }
- imp->imp_level = LUSTRE_CONN_RECOVD;
- spin_unlock_irqrestore(&imp->imp_lock, flags);
- if (imp->imp_recover != NULL) {
- rc = imp->imp_recover(imp, PTLRPC_RECOVD_PHASE_NOTCONN);
- if (rc)
- LBUG();
- }
- GOTO(out, rc = -EIO);
+ rc = after_reply (req, &do_restart);
+ /* NB may return +ve success rc */
+ if (do_restart) {
+ if (req->rq_bulk != NULL)
+ ptlrpc_unregister_bulk (req);
+ DEBUG_REQ(D_HA, req, "resending: ");
+ goto restart;
}
- rc = ptlrpc_check_status(req);
-
- if (req->rq_import->imp_flags & IMP_REPLAYABLE) {
- spin_lock_irqsave(&imp->imp_lock, flags);
- if ((req->rq_flags & PTL_RPC_FL_REPLAY || req->rq_transno != 0)
- && rc >= 0) {
- ptlrpc_retain_replayable_request(req, imp);
- }
-
- if (req->rq_transno > imp->imp_max_transno) {
- imp->imp_max_transno = req->rq_transno;
+ out:
+ if (req->rq_bulk != NULL) {
+ if (rc >= 0) { /* success so far */
+ lwi = LWI_TIMEOUT (timeout, NULL, NULL);
+ brc = l_wait_event (req->rq_wait_for_rep,
+ ptlrpc_bulk_complete (req->rq_bulk), &lwi);
+ if (brc != 0) {
+ LASSERT (brc == -ETIMEDOUT);
+ CERROR ("Timed out waiting for bulk\n");
+ rc = brc;
+ }
}
-
- /* Replay-enabled imports return commit-status information. */
- if (req->rq_repmsg->last_committed) {
- imp->imp_peer_committed_transno =
- req->rq_repmsg->last_committed;
+ if (rc < 0) {
+ /* MDS blocks for put ACKs before replying */
+ /* OSC sets rq_no_resend for the time being */
+ LASSERT (req->rq_no_resend);
+ ptlrpc_unregister_bulk (req);
}
- ptlrpc_free_committed(imp);
- spin_unlock_irqrestore(&imp->imp_lock, flags);
}
-
- EXIT;
- out:
- return rc;
+
+ LASSERT (!req->rq_receiving_reply);
+ req->rq_phase = RQ_PHASE_INTERPRET;
+ RETURN (rc);
}
int ptlrpc_replay_req(struct ptlrpc_request *req)
struct l_wait_info lwi;
ENTRY;
- init_waitqueue_head(&req->rq_wait_for_rep);
- DEBUG_REQ(D_NET, req, "");
+ /* I don't touch rq_phase here, so the debug log can show what
+ * state it was left in */
+
+ /* Not handling automatic bulk replay yet (or ever?) */
+ LASSERT (req->rq_bulk == NULL);
+
+ DEBUG_REQ(D_NET, req, "about to replay");
- req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
- req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
+ /* Update request's state, since we might have a new connection. */
+ ptlrpc_put_connection(req->rq_connection);
+ req->rq_connection =
+ ptlrpc_connection_addref(req->rq_import->imp_connection);
/* temporarily set request to RECOVD level (reset at out:) */
old_level = req->rq_level;
- if (req->rq_flags & PTL_RPC_FL_REPLIED)
+ if (req->rq_replied)
old_status = req->rq_repmsg->status;
req->rq_level = LUSTRE_CONN_RECOVD;
rc = ptl_send_rpc(req);
// up(&cli->cli_rpc_sem);
- if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) {
+ /* If the reply was received normally, this just grabs the spinlock
+ * (ensuring the reply callback has returned), sees that
+ * req->rq_receiving_reply is clear and returns. */
+ ptlrpc_unregister_reply (req);
+
+ if (!req->rq_replied) {
CERROR("Unknown reason for wakeup\n");
/* XXX Phil - I end up here when I kill obdctl */
- ptlrpc_abort(req);
+ /* ...that's because signals aren't all masked in
+ * l_wait_event() -eeb */
GOTO(out, rc = -EINTR);
}
+#if SWAB_PARANOIA
+ /* Clear reply swab mask; this is a new reply in sender's byte order */
+ req->rq_rep_swab_mask = 0;
+#endif
rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
if (rc) {
CERROR("unpack_rep failed: %d\n", rc);
- GOTO(out, rc);
+ GOTO(out, rc = -EPROTO);
}
+#if 0
+ /* FIXME: Enable when BlueArc makes new release */
+ if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
+ req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
+ CERROR("invalid packet type received (type=%u)\n",
+ req->rq_repmsg->type);
+ GOTO(out, rc = -EPROTO);
+ }
+#endif
+
+ /* The transno had better not change over replay. */
+ LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
CDEBUG(D_NET, "got rep "LPD64"\n", req->rq_xid);
if (req->rq_replay_cb)
req->rq_replay_cb(req);
- if ((req->rq_flags & PTL_RPC_FL_REPLIED) &&
- req->rq_repmsg->status != old_status) {
+ if (req->rq_replied && req->rq_repmsg->status != old_status) {
DEBUG_REQ(D_HA, req, "status %d, old was %d",
req->rq_repmsg->status, old_status);
}
RETURN(rc);
}
-/* XXX looks a lot like super.c:invalidate_request_list, don't it? */
-void ptlrpc_abort_inflight(struct obd_import *imp, int dying_import)
+void ptlrpc_abort_inflight(struct obd_import *imp)
{
unsigned long flags;
struct list_head *tmp, *n;
ENTRY;
/* Make sure that no new requests get processed for this import.
- * ptlrpc_queue_wait must (and does) hold imp_lock while testing this
- * flag and then putting requests on sending_list or delayed_list.
+ * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
+ * this flag and then putting requests on sending_list or delayed_list.
+ */
+ spin_lock_irqsave(&imp->imp_lock, flags);
+ if (!imp->imp_replayable)
+ /* on b_devel, I moved this line to
+ ptlrpc_set_import_active because I thought it made
+ more sense there and possibly not all callers of
+ this function expect this. I'll leave it here until
+ I can figure out if it's correct or not. - rread 5/12/03 */
+ imp->imp_invalid = 1;
+
+ /* XXX locking? Maybe we should remove each request with the list
+ * locked? Also, how do we know if the requests on the list are
+ * being freed at this time?
*/
- if ((imp->imp_flags & IMP_REPLAYABLE) == 0) {
- spin_lock_irqsave(&imp->imp_lock, flags);
- imp->imp_flags |= IMP_INVALID;
- spin_unlock_irqrestore(&imp->imp_lock, flags);
- }
-
list_for_each_safe(tmp, n, &imp->imp_sending_list) {
struct ptlrpc_request *req =
list_entry(tmp, struct ptlrpc_request, rq_list);
DEBUG_REQ(D_HA, req, "inflight");
- req->rq_flags |= PTL_RPC_FL_ERR;
- if (dying_import)
- req->rq_import = NULL;
- wake_up(&req->rq_wait_for_rep);
+
+ spin_lock (&req->rq_lock);
+ req->rq_err = 1;
+ if (req->rq_set != NULL)
+ wake_up(&req->rq_set->set_waitq);
+ else
+ wake_up(&req->rq_wait_for_rep);
+ spin_unlock (&req->rq_lock);
}
list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
list_entry(tmp, struct ptlrpc_request, rq_list);
DEBUG_REQ(D_HA, req, "aborting waiting req");
- req->rq_flags |= PTL_RPC_FL_ERR;
- if (dying_import)
- req->rq_import = NULL;
- wake_up(&req->rq_wait_for_rep);
+
+ spin_lock (&req->rq_lock);
+ req->rq_err = 1;
+ if (req->rq_set != NULL)
+ wake_up(&req->rq_set->set_waitq);
+ else
+ wake_up(&req->rq_wait_for_rep);
+ spin_unlock (&req->rq_lock);
}
+
+ /* Last chance to free reqs left on the replay list, but we
+ * will still leak reqs that haven't comitted. */
+ if (imp->imp_replayable)
+ ptlrpc_free_committed(imp);
+
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
+
EXIT;
}
+
+static __u64 ptlrpc_last_xid = 0;
+static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
+
+__u64 ptlrpc_next_xid(void)
+{
+ __u64 tmp;
+ spin_lock(&ptlrpc_last_xid_lock);
+ tmp = ++ptlrpc_last_xid;
+ spin_unlock(&ptlrpc_last_xid_lock);
+ return tmp;
+}
+
+