X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fptlrpc%2Fclient.c;h=a1322ac97dd421dbfb30b3ec2900522cbc8565d6;hb=726c9cf39547151a7cf85118c8a23b9ecf8220f5;hp=73c2cafe6c7da10209773b875acbaa2a3814595f;hpb=5ddd01a4ce31e1a046d8139d34dcbd4476b1201e;p=fs%2Flustre-release.git diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 73c2caf..a1322ac 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -25,6 +25,7 @@ #include #include #include +#include #define DEBUG_SUBSYSTEM S_RPC @@ -32,77 +33,28 @@ #include #include -int ptlrpc_enqueue(struct ptlrpc_client *peer, struct ptlrpc_request *req) -{ - struct ptlrpc_request *srv_req; - - if (!peer->cli_obd) { - EXIT; - return -1; - } - - OBD_ALLOC(srv_req, sizeof(*srv_req)); - if (!srv_req) { - EXIT; - return -ENOMEM; - } - - CDEBUG(0, "peer obd minor %d, incoming req %p, srv_req %p\n", - peer->cli_obd->obd_minor, req, srv_req); - - memset(srv_req, 0, sizeof(*req)); - - /* move the request buffer */ - srv_req->rq_reqbuf = req->rq_reqbuf; - srv_req->rq_reqlen = req->rq_reqlen; - srv_req->rq_obd = peer->cli_obd; - - /* remember where it came from */ - srv_req->rq_reply_handle = req; - - spin_lock(&peer->cli_lock); - list_add(&srv_req->rq_list, &peer->cli_obd->obd_req_list); - spin_unlock(&peer->cli_lock); - wake_up(&peer->cli_obd->obd_req_waitq); - return 0; -} - -int ptlrpc_connect_client(int dev, char *uuid, int req_portal, int rep_portal, - req_pack_t req_pack, rep_unpack_t rep_unpack, +void ptlrpc_init_client(int dev, int req_portal, int rep_portal, struct ptlrpc_client *cl) { - int err; - memset(cl, 0, sizeof(*cl)); spin_lock_init(&cl->cli_lock); cl->cli_xid = 1; - cl->cli_obd = NULL; + cl->cli_generation = 1; + cl->cli_epoch = 1; + cl->cli_bootcount = 0; + cl->cli_obd = NULL; cl->cli_request_portal = req_portal; cl->cli_reply_portal = rep_portal; - cl->cli_rep_unpack = rep_unpack; - cl->cli_req_pack = req_pack; - atomic_set(&cl->cli_queue_length, 32); - init_waitqueue_head(&cl->cli_waitq); - - /* non networked client */ - if (dev >= 0 && dev < MAX_OBD_DEVICES) { - struct obd_device *obd = &obd_dev[dev]; - - if ((!obd->obd_flags & OBD_ATTACHED) || - (!obd->obd_flags & OBD_SET_UP)) { - CERROR("target device %d not att or setup\n", dev); - return -EINVAL; - } - if (strcmp(obd->obd_type->typ_name, "ost") && - strcmp(obd->obd_type->typ_name, "mds")) { - return -EINVAL; - } - - cl->cli_obd = &obd_dev[dev]; - return 0; - } + INIT_LIST_HEAD(&cl->cli_sending_head); + INIT_LIST_HEAD(&cl->cli_sent_head); + sema_init(&cl->cli_rpc_sem, 32); +} - /* networked */ +int ptlrpc_connect_client(int dev, char *uuid, struct ptlrpc_client *cl) +{ + int err; + + cl->cli_epoch++; err = kportal_uuid_to_peer(uuid, &cl->cli_server); if (err != 0) CERROR("cannot find peer %s!\n", uuid); @@ -116,7 +68,6 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *peer) OBD_ALLOC(bulk, sizeof(*bulk)); if (bulk != NULL) { - memset(bulk, 0, sizeof(*bulk)); memcpy(&bulk->b_peer, peer, sizeof(*peer)); init_waitqueue_head(&bulk->b_waitq); } @@ -124,39 +75,36 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *peer) return bulk; } -struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, - int opcode, int namelen, char *name, - int tgtlen, char *tgt) +struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, int opcode, + int count, int *lengths, char **bufs) { struct ptlrpc_request *request; int rc; - ENTRY; + ENTRY; OBD_ALLOC(request, sizeof(*request)); - if (!request) { + if (!request) { CERROR("request allocation out of memory\n"); - return NULL; + RETURN(NULL); } - memset(request, 0, sizeof(*request)); - //spin_lock_init(&request->rq_lock); - spin_lock(&cl->cli_lock); request->rq_xid = cl->cli_xid++; spin_unlock(&cl->cli_lock); - rc = cl->cli_req_pack(name, namelen, tgt, tgtlen, - &request->rq_reqhdr, &request->rq_req, - &request->rq_reqlen, &request->rq_reqbuf); - if (rc) { - CERROR("cannot pack request %d\n", rc); - return NULL; + rc = lustre_pack_msg(count, lengths, bufs, + &request->rq_reqlen, &request->rq_reqbuf); + if (rc) { + CERROR("cannot pack request %d\n", rc); + RETURN(NULL); } - request->rq_reqhdr->opc = opcode; - request->rq_reqhdr->xid = request->rq_xid; + request->rq_type = PTL_RPC_REQUEST; + request->rq_reqmsg = (struct lustre_msg *)request->rq_reqbuf; + request->rq_reqmsg->opc = HTON__u32(opcode); + request->rq_reqmsg->xid = HTON__u32(request->rq_xid); + request->rq_reqmsg->type = HTON__u32(request->rq_type); - EXIT; - return request; + RETURN(request); } void ptlrpc_free_req(struct ptlrpc_request *request) @@ -180,7 +128,7 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req) if (sigismember(&(current->pending.signal), SIGKILL) || sigismember(&(current->pending.signal), SIGTERM) || - sigismember(&(current->pending.signal), SIGINT)) { + sigismember(&(current->pending.signal), SIGINT)) { req->rq_flags = PTL_RPC_INTR; GOTO(out, rc = 1); } @@ -195,93 +143,48 @@ int ptlrpc_check_status(struct ptlrpc_request *req, int err) if (err != 0) { CERROR("err is %d\n", err); - EXIT; - return err; + RETURN(err); } if (req == NULL) { CERROR("req == NULL\n"); - EXIT; - return -ENOMEM; + RETURN(-ENOMEM); } - if (req->rq_rephdr == NULL) { - CERROR("req->rq_rephdr == NULL\n"); - EXIT; - return -ENOMEM; + if (req->rq_repmsg == NULL) { + CERROR("req->rq_repmsg == NULL\n"); + RETURN(-ENOMEM); } - if (req->rq_rephdr->status != 0) { - CERROR("req->rq_rephdr->status is %d\n", - req->rq_rephdr->status); - EXIT; + if (req->rq_repmsg->status != 0) { + CERROR("req->rq_repmsg->status is %d\n", + req->rq_repmsg->status); /* XXX: translate this error from net to host */ - return req->rq_rephdr->status; + RETURN(req->rq_repmsg->status); } - EXIT; - return 0; + RETURN(0); +} + +static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request) +{ + OBD_FREE(request->rq_reqbuf, request->rq_reqlen); + request->rq_reqbuf = NULL; + request->rq_reqlen = 0; } /* Abort this request and cleanup any resources associated with it. */ -int ptlrpc_abort(struct ptlrpc_request *request) +static int ptlrpc_abort(struct ptlrpc_request *request) { /* First remove the ME for the reply; in theory, this means * that we can tear down the buffer safely. */ - //spin_lock(&request->rq_lock); PtlMEUnlink(request->rq_reply_me_h); - OBD_FREE(request->rq_repbuf, request->rq_replen); + OBD_FREE(request->rq_reply_md.start, request->rq_replen); request->rq_repbuf = NULL; request->rq_replen = 0; - //spin_unlock(&request->rq_lock); - - return 0; -} - -static int ptlrpc_check_queue_depth(void *data) -{ - struct ptlrpc_client *cl = data; - - if (atomic_read(&cl->cli_queue_length) > 0) - return 1; - - if (sigismember(&(current->pending.signal), SIGKILL) || - sigismember(&(current->pending.signal), SIGTERM) || - sigismember(&(current->pending.signal), SIGINT)) - return 1; - return 0; } -#define __ptlrpc_wait_event_interruptible(wq, condition, ret) \ -do { \ - wait_queue_t __wait; \ - init_waitqueue_entry(&__wait, current); \ - \ - add_wait_queue_exclusive(&wq, &__wait); \ - for (;;) { \ - set_current_state(TASK_INTERRUPTIBLE); \ - if (condition) \ - break; \ - if (!signal_pending(current)) { \ - schedule(); \ - continue; \ - } \ - ret = -ERESTARTSYS; \ - break; \ - } \ - current->state = TASK_RUNNING; \ - remove_wait_queue(&wq, &__wait); \ -} while (0) - -#define ptlrpc_wait_event_interruptible(wq, condition) \ -({ \ - int __ret = 0; \ - if (!(condition)) \ - __ptlrpc_wait_event_interruptible(wq, condition, __ret); \ - __ret; \ -}) - int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) { int rc = 0; @@ -289,55 +192,46 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) init_waitqueue_head(&req->rq_wait_for_rep); - if (atomic_dec_and_test(&cl->cli_queue_length)) - ptlrpc_wait_event_interruptible(cl->cli_waitq, - ptlrpc_check_queue_depth(cl)); - - if (cl->cli_obd) { - /* Local delivery */ - rc = ptlrpc_enqueue(cl, req); - } else { - /* Remote delivery via portals. */ - req->rq_req_portal = cl->cli_request_portal; - req->rq_reply_portal = cl->cli_reply_portal; - rc = ptl_send_rpc(req, &cl->cli_server); - } - if (rc) { - CERROR("error %d, opcode %d\n", rc, req->rq_reqhdr->opc); - atomic_inc(&cl->cli_queue_length); - wake_up(&cl->cli_waitq); + req->rq_client = cl; + req->rq_req_portal = cl->cli_request_portal; + req->rq_reply_portal = cl->cli_reply_portal; + rc = ptl_send_rpc(req, cl); + if (rc) { + CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc); + ptlrpc_cleanup_request_buf(req); + up(&cl->cli_rpc_sem); RETURN(-rc); } CDEBUG(D_OTHER, "-- sleeping\n"); wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req)); CDEBUG(D_OTHER, "-- done\n"); - atomic_inc(&cl->cli_queue_length); - wake_up(&cl->cli_waitq); - if (req->rq_flags == PTL_RPC_INTR) { + ptlrpc_cleanup_request_buf(req); + up(&cl->cli_rpc_sem); + if (req->rq_flags == PTL_RPC_INTR) { /* Clean up the dangling reply buffers */ ptlrpc_abort(req); GOTO(out, rc = -EINTR); } - if (req->rq_flags != PTL_RPC_REPLY) { + if (req->rq_flags != PTL_RPC_REPLY) { CERROR("Unknown reason for wakeup\n"); /* XXX Phil - I end up here when I kill obdctl */ - ptlrpc_abort(req); + ptlrpc_abort(req); GOTO(out, rc = -EINTR); } - rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, - &req->rq_rephdr, &req->rq_rep); + rc = lustre_unpack_msg(req->rq_repbuf, req->rq_replen); + req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; if (rc) { CERROR("unpack_rep failed: %d\n", rc); GOTO(out, rc); } - CDEBUG(D_NET, "got rep %d\n", req->rq_rephdr->xid); + CDEBUG(D_NET, "got rep %d\n", req->rq_repmsg->xid); - if ( req->rq_rephdr->status == 0 ) + if (req->rq_repmsg->status == 0) CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repbuf, - req->rq_replen, req->rq_rephdr->status); + req->rq_replen, req->rq_repmsg->status); EXIT; out: