spinlock_t cli_lock;
__u32 cli_xid;
- atomic_t cli_queue_length;
- wait_queue_head_t cli_waitq;
+ struct semaphore cli_rpc_sem;
};
/* These do double-duty in rq_type and rq_flags */
struct ptlrpc_request *req);
int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc,
struct ptlrpc_request *req);
-int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer);
+int ptl_send_rpc(struct ptlrpc_request *request, struct ptlrpc_client *cl);
void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i);
/* rpc/client.c */
cl->cli_reply_portal = rep_portal;
cl->cli_rep_unpack = rep_unpack;
cl->cli_req_pack = req_pack;
- atomic_set(&cl->cli_queue_length, 32);
- init_waitqueue_head(&cl->cli_waitq);
+ sema_init(&cl->cli_rpc_sem, 32);
/* non networked client */
if (dev >= 0 && dev < MAX_OBD_DEVICES) {
return 0;
}
-static int ptlrpc_check_queue_depth(void *data)
-{
- struct ptlrpc_client *cl = data;
-
- if (atomic_read(&cl->cli_queue_length) > 0)
- return 1;
-
- if (sigismember(&(current->pending.signal), SIGKILL) ||
- sigismember(&(current->pending.signal), SIGTERM) ||
- sigismember(&(current->pending.signal), SIGINT))
- return 1;
-
- return 0;
-}
-
-#define __ptlrpc_wait_event_interruptible(wq, condition, ret) \
-do { \
- wait_queue_t __wait; \
- init_waitqueue_entry(&__wait, current); \
- \
- add_wait_queue_exclusive(&wq, &__wait); \
- for (;;) { \
- set_current_state(TASK_INTERRUPTIBLE); \
- if (condition) \
- break; \
- if (!signal_pending(current)) { \
- schedule(); \
- continue; \
- } \
- ret = -ERESTARTSYS; \
- break; \
- } \
- current->state = TASK_RUNNING; \
- remove_wait_queue(&wq, &__wait); \
-} while (0)
-
-#define ptlrpc_wait_event_interruptible(wq, condition) \
-({ \
- int __ret = 0; \
- if (!(condition)) \
- __ptlrpc_wait_event_interruptible(wq, condition, __ret); \
- __ret; \
-})
-
int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
{
int rc = 0;
init_waitqueue_head(&req->rq_wait_for_rep);
- if (atomic_dec_and_test(&cl->cli_queue_length))
- ptlrpc_wait_event_interruptible(cl->cli_waitq,
- ptlrpc_check_queue_depth(cl));
-
if (cl->cli_obd) {
/* Local delivery */
+ down(&cl->cli_rpc_sem);
rc = ptlrpc_enqueue(cl, req);
} else {
/* Remote delivery via portals. */
req->rq_req_portal = cl->cli_request_portal;
req->rq_reply_portal = cl->cli_reply_portal;
- rc = ptl_send_rpc(req, &cl->cli_server);
+ rc = ptl_send_rpc(req, cl);
}
if (rc) {
CERROR("error %d, opcode %d\n", rc, req->rq_reqhdr->opc);
- atomic_inc(&cl->cli_queue_length);
- wake_up(&cl->cli_waitq);
+ up(&cl->cli_rpc_sem);
RETURN(-rc);
}
CDEBUG(D_OTHER, "-- sleeping\n");
wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req));
CDEBUG(D_OTHER, "-- done\n");
- atomic_inc(&cl->cli_queue_length);
- wake_up(&cl->cli_waitq);
+ up(&cl->cli_rpc_sem);
if (req->rq_flags == PTL_RPC_INTR) {
/* Clean up the dangling reply buffers */
ptlrpc_abort(req);
return ptlrpc_reply(obddev, svc, req);
}
-int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
+int ptl_send_rpc(struct ptlrpc_request *request, struct ptlrpc_client *cl)
{
ptl_process_id_t local_id;
struct ptlreq_hdr *hdr;
local_id.nid = PTL_ID_ANY;
local_id.pid = PTL_ID_ANY;
- rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
- request->rq_xid, 0, PTL_UNLINK, PTL_INS_AFTER,
- &request->rq_reply_me_h);
+ down(&cl->cli_rpc_sem);
+
+ rc = PtlMEAttach(cl->cli_server.peer_ni, request->rq_reply_portal,
+ local_id, request->rq_xid, 0, PTL_UNLINK,
+ PTL_INS_AFTER, &request->rq_reply_me_h);
if (rc != PTL_OK) {
CERROR("PtlMEAttach failed: %d\n", rc);
LBUG();
CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %u, portal %u\n",
request->rq_replen, request->rq_xid, request->rq_reply_portal);
- return ptl_send_buf(request, peer, request->rq_req_portal);
+ return ptl_send_buf(request, &cl->cli_server, request->rq_req_portal);
cleanup2:
PtlMEUnlink(request->rq_reply_me_h);
cleanup:
OBD_FREE(repbuf, request->rq_replen);
+ up(&cl->cli_rpc_sem);
return rc;
}