struct ptlrpc_request {
int rq_type; /* one of PTL_RPC_REQUEST, PTL_RPC_REPLY, PTL_RPC_BULK */
+ spinlock_t rq_lock;
struct list_head rq_list;
struct obd_device *rq_obd;
int rq_status;
int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc,
struct ptlrpc_request *req);
int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer);
-int ptl_received_rpc(struct ptlrpc_service *service);
/* rpc/client.c */
int ptlrpc_connect_client(int dev, char *uuid, int req_portal, int rep_portal,
/* remember where it came from */
srv_req->rq_reply_handle = req;
+ spin_lock(&peer->cli_lock);
list_add(&srv_req->rq_list, &peer->cli_obd->obd_req_list);
+ spin_unlock(&peer->cli_lock);
wake_up(&peer->cli_obd->obd_req_waitq);
return 0;
}
}
memset(request, 0, sizeof(*request));
+ spin_lock_init(&request->rq_lock);
spin_lock(&cl->cli_lock);
request->rq_xid = cl->cli_xid++;
static int ptlrpc_check_reply(struct ptlrpc_request *req)
{
+ int rc = 0;
+
+ spin_lock(&req->rq_lock);
if (req->rq_repbuf != NULL) {
req->rq_flags = PTL_RPC_REPLY;
EXIT;
- return 1;
+ rc = 1;
+ goto out;
}
if (sigismember(&(current->pending.signal), SIGKILL) ||
sigismember(&(current->pending.signal), SIGINT)) {
req->rq_flags = PTL_RPC_INTR;
EXIT;
- return 1;
+ rc = 1;
+ goto out;
}
- return 0;
+ out:
+ spin_unlock(&req->rq_lock);
+ return rc;
}
int ptlrpc_check_status(struct ptlrpc_request *req, int err)
{
/* First remove the MD for the reply; in theory, this means
* that we can tear down the buffer safely. */
+ spin_lock(&request->rq_lock);
PtlMEUnlink(request->rq_reply_me_h);
PtlMDUnlink(request->rq_reply_md_h);
OBD_FREE(request->rq_repbuf, request->rq_replen);
request->rq_repbuf = NULL;
request->rq_replen = 0;
+ spin_unlock(&request->rq_lock);
return 0;
}
+
int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
{
- int rc;
+ int rc = 0;
ENTRY;
init_waitqueue_head(&req->rq_wait_for_rep);
CDEBUG(D_OTHER, "-- sleeping\n");
wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req));
CDEBUG(D_OTHER, "-- done\n");
-
+ spin_lock(&req->rq_lock);
if (req->rq_flags == PTL_RPC_INTR) {
/* Clean up the dangling reply buffers */
ptlrpc_abort(req);
EXIT;
- return -EINTR;
+ rc = -EINTR;
+ goto out;
}
if (req->rq_flags != PTL_RPC_REPLY) {
ptlrpc_abort(req);
//BUG();
EXIT;
- return -EINTR;
+ rc = -EINTR;
+ goto out;
}
rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen,
&req->rq_rephdr, &req->rq_rep);
if (rc) {
CERROR("unpack_rep failed: %d\n", rc);
- return rc;
+ goto out;
}
CDEBUG(D_NET, "got rep %d\n", req->rq_rephdr->xid);
req->rq_replen, req->rq_rephdr->status);
EXIT;
- return 0;
+ out:
+ spin_unlock(&req->rq_lock);
+ return rc;
}
* it finishes processing an event. This ensures the ref count is
* decremented and that the rpc ring buffer cycles properly.
*/
-int ptl_received_rpc(struct ptlrpc_service *service)
+int ptl_handled_rpc(struct ptlrpc_service *service, void *start)
{
- int rc, index;
+ int rc, index = 0;
spin_lock(&service->srv_lock);
- index = service->srv_md_active;
+ /* XXX this is wrong must find index on which request arrived!!!*/
+ while (index < service->srv_ring_length) {
+ if ( service->srv_md[index].start == start)
+ break;
+ index++;
+ }
+ if (index == service->srv_ring_length)
+ BUG();
+
CDEBUG(D_INFO, "MD index=%d Ref Count=%d\n", index,
service->srv_ref_count[index]);
service->srv_ref_count[index]--;
- if ((service->srv_ref_count[index] <= 0) &&
+ if (service->srv_ref_count[index] < 0)
+ BUG();
+
+ if ((service->srv_ref_count[index] == 0) &&
(service->srv_me_h[index] == 0)) {
/* Replace the unlinked ME and MD */
#include <linux/lustre_net.h>
extern int server_request_callback(ptl_event_t *ev, void *data);
+extern int ptl_handled_rpc(struct ptlrpc_service *service, void *start);
static int ptlrpc_check_event(struct ptlrpc_service *svc)
{
if (svc->srv_flags & SVC_EVENT) {
struct ptlrpc_request request;
+ void *start;
svc->srv_flags = SVC_RUNNING;
/* FIXME: If we move to an event-driven model,
* we should put the request on the stack of
* mds_handle instead. */
+ start = svc->srv_ev.mem_desc.start;
memset(&request, 0, sizeof(request));
request.rq_obd = obddev;
request.rq_reqbuf = (svc->srv_ev.mem_desc.start +
spin_unlock(&svc->srv_lock);
rc = svc->srv_handler(obddev, svc, &request);
- ptl_received_rpc(svc);
+ ptl_handled_rpc(svc, start);
continue;
}