X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fptlrpc%2Fclient.c;h=a1322ac97dd421dbfb30b3ec2900522cbc8565d6;hb=726c9cf39547151a7cf85118c8a23b9ecf8220f5;hp=4755dda9834457b9856a38c928cd111ca489ef97;hpb=a34c37a94de1188da1f033f589ea01b08f19f722;p=fs%2Flustre-release.git diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 4755dda..a1322ac 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -25,6 +25,7 @@ #include #include #include +#include #define DEBUG_SUBSYSTEM S_RPC @@ -32,77 +33,31 @@ #include #include -int ptlrpc_enqueue(struct ptlrpc_client *peer, struct ptlrpc_request *req) +void ptlrpc_init_client(int dev, int req_portal, int rep_portal, + struct ptlrpc_client *cl) { - struct ptlrpc_request *srv_req; - - if (!peer->cli_obd) { - EXIT; - return -1; - } - - OBD_ALLOC(srv_req, sizeof(*srv_req)); - if (!srv_req) { - EXIT; - return -ENOMEM; - } - - CDEBUG(0, "peer obd minor %d, incoming req %p, srv_req %p\n", - peer->cli_obd->obd_minor, req, srv_req); - - memset(srv_req, 0, sizeof(*req)); - - /* move the request buffer */ - srv_req->rq_reqbuf = req->rq_reqbuf; - srv_req->rq_reqlen = req->rq_reqlen; - srv_req->rq_obd = peer->cli_obd; - - /* remember where it came from */ - srv_req->rq_reply_handle = req; - - list_add(&srv_req->rq_list, &peer->cli_obd->obd_req_list); - wake_up(&peer->cli_obd->obd_req_waitq); - return 0; + memset(cl, 0, sizeof(*cl)); + spin_lock_init(&cl->cli_lock); + cl->cli_xid = 1; + cl->cli_generation = 1; + cl->cli_epoch = 1; + cl->cli_bootcount = 0; + cl->cli_obd = NULL; + cl->cli_request_portal = req_portal; + cl->cli_reply_portal = rep_portal; + INIT_LIST_HEAD(&cl->cli_sending_head); + INIT_LIST_HEAD(&cl->cli_sent_head); + sema_init(&cl->cli_rpc_sem, 32); } -int ptlrpc_connect_client(int dev, char *uuid, int req_portal, int rep_portal, - req_pack_t req_pack, rep_unpack_t rep_unpack, - struct ptlrpc_client *cl) +int ptlrpc_connect_client(int dev, char *uuid, struct ptlrpc_client *cl) { - int err; + int err; - memset(cl, 0, sizeof(*cl)); - spin_lock_init(&cl->cli_lock); - cl->cli_xid = 1; - cl->cli_obd = NULL; - cl->cli_request_portal = req_portal; - cl->cli_reply_portal = rep_portal; - cl->cli_rep_unpack = rep_unpack; - cl->cli_req_pack = req_pack; - - /* non networked client */ - if (dev >= 0 && dev < MAX_OBD_DEVICES) { - struct obd_device *obd = &obd_dev[dev]; - - if ((!obd->obd_flags & OBD_ATTACHED) || - (!obd->obd_flags & OBD_SET_UP)) { - CERROR("target device %d not att or setup\n", dev); - return -EINVAL; - } - if (strcmp(obd->obd_type->typ_name, "ost") && - strcmp(obd->obd_type->typ_name, "mds")) { - return -EINVAL; - } - - cl->cli_obd = &obd_dev[dev]; - return 0; - } - - /* networked */ - err = kportal_uuid_to_peer(uuid, &cl->cli_server); - if (err != 0) { - CERROR("cannot find peer %s!", uuid); - } + cl->cli_epoch++; + err = kportal_uuid_to_peer(uuid, &cl->cli_server); + if (err != 0) + CERROR("cannot find peer %s!\n", uuid); return err; } @@ -113,7 +68,6 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *peer) OBD_ALLOC(bulk, sizeof(*bulk)); if (bulk != NULL) { - memset(bulk, 0, sizeof(*bulk)); memcpy(&bulk->b_peer, peer, sizeof(*peer)); init_waitqueue_head(&bulk->b_waitq); } @@ -121,38 +75,36 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *peer) return bulk; } -struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, - int opcode, int namelen, char *name, - int tgtlen, char *tgt) +struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, int opcode, + int count, int *lengths, char **bufs) { - struct ptlrpc_request *request; - int rc; - ENTRY; - - OBD_ALLOC(request, sizeof(*request)); - if (!request) { - CERROR("request allocation out of memory\n"); - return NULL; - } + struct ptlrpc_request *request; + int rc; + ENTRY; - memset(request, 0, sizeof(*request)); + OBD_ALLOC(request, sizeof(*request)); + if (!request) { + CERROR("request allocation out of memory\n"); + RETURN(NULL); + } spin_lock(&cl->cli_lock); - request->rq_xid = cl->cli_xid++; + request->rq_xid = cl->cli_xid++; spin_unlock(&cl->cli_lock); - rc = cl->cli_req_pack(name, namelen, tgt, tgtlen, - &request->rq_reqhdr, &request->rq_req, - &request->rq_reqlen, &request->rq_reqbuf); - if (rc) { - CERROR("cannot pack request %d\n", rc); - return NULL; - } - request->rq_reqhdr->opc = opcode; - request->rq_reqhdr->xid = request->rq_xid; - - EXIT; - return request; + rc = lustre_pack_msg(count, lengths, bufs, + &request->rq_reqlen, &request->rq_reqbuf); + if (rc) { + CERROR("cannot pack request %d\n", rc); + RETURN(NULL); + } + request->rq_type = PTL_RPC_REQUEST; + request->rq_reqmsg = (struct lustre_msg *)request->rq_reqbuf; + request->rq_reqmsg->opc = HTON__u32(opcode); + request->rq_reqmsg->xid = HTON__u32(request->rq_xid); + request->rq_reqmsg->type = HTON__u32(request->rq_type); + + RETURN(request); } void ptlrpc_free_req(struct ptlrpc_request *request) @@ -162,26 +114,27 @@ void ptlrpc_free_req(struct ptlrpc_request *request) if (request->rq_repbuf != NULL) OBD_FREE(request->rq_repbuf, request->rq_replen); - OBD_FREE(request, sizeof(*request)); + OBD_FREE(request, sizeof(*request)); } static int ptlrpc_check_reply(struct ptlrpc_request *req) { + int rc = 0; + if (req->rq_repbuf != NULL) { req->rq_flags = PTL_RPC_REPLY; - EXIT; - return 1; + GOTO(out, rc = 1); } if (sigismember(&(current->pending.signal), SIGKILL) || - sigismember(&(current->pending.signal), SIGSTOP) || - sigismember(&(current->pending.signal), SIGINT)) { + sigismember(&(current->pending.signal), SIGTERM) || + sigismember(&(current->pending.signal), SIGINT)) { req->rq_flags = PTL_RPC_INTR; - EXIT; - return 1; + GOTO(out, rc = 1); } - return 0; + out: + return rc; } int ptlrpc_check_status(struct ptlrpc_request *req, int err) @@ -190,103 +143,97 @@ int ptlrpc_check_status(struct ptlrpc_request *req, int err) if (err != 0) { CERROR("err is %d\n", err); - EXIT; - return err; + RETURN(err); } if (req == NULL) { CERROR("req == NULL\n"); - EXIT; - return -ENOMEM; + RETURN(-ENOMEM); } - if (req->rq_rephdr == NULL) { - CERROR("req->rq_rephdr == NULL\n"); - EXIT; - return -ENOMEM; + if (req->rq_repmsg == NULL) { + CERROR("req->rq_repmsg == NULL\n"); + RETURN(-ENOMEM); } - if (req->rq_rephdr->status != 0) { - CERROR("req->rq_rephdr->status is %d\n", - req->rq_rephdr->status); - EXIT; + if (req->rq_repmsg->status != 0) { + CERROR("req->rq_repmsg->status is %d\n", + req->rq_repmsg->status); /* XXX: translate this error from net to host */ - return req->rq_rephdr->status; + RETURN(req->rq_repmsg->status); } - EXIT; - return 0; + RETURN(0); +} + +static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request) +{ + OBD_FREE(request->rq_reqbuf, request->rq_reqlen); + request->rq_reqbuf = NULL; + request->rq_reqlen = 0; } /* Abort this request and cleanup any resources associated with it. */ -int ptlrpc_abort(struct ptlrpc_request *request) +static int ptlrpc_abort(struct ptlrpc_request *request) { - /* First remove the MD for the reply; in theory, this means + /* First remove the ME for the reply; in theory, this means * that we can tear down the buffer safely. */ PtlMEUnlink(request->rq_reply_me_h); - PtlMDUnlink(request->rq_reply_md_h); - OBD_FREE(request->rq_repbuf, request->rq_replen); + OBD_FREE(request->rq_reply_md.start, request->rq_replen); request->rq_repbuf = NULL; request->rq_replen = 0; - return 0; } int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) - { - int rc; + int rc = 0; ENTRY; - init_waitqueue_head(&req->rq_wait_for_rep); - - if (cl->cli_obd) { - /* Local delivery */ - ENTRY; - rc = ptlrpc_enqueue(cl, req); - } else { - /* Remote delivery via portals. */ - req->rq_req_portal = cl->cli_request_portal; - req->rq_reply_portal = cl->cli_reply_portal; - rc = ptl_send_rpc(req, &cl->cli_server); - } - if (rc) { - CERROR("error %d, opcode %d\n", rc, req->rq_reqhdr->opc); - return -rc; - } + init_waitqueue_head(&req->rq_wait_for_rep); + + req->rq_client = cl; + req->rq_req_portal = cl->cli_request_portal; + req->rq_reply_portal = cl->cli_reply_portal; + rc = ptl_send_rpc(req, cl); + if (rc) { + CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc); + ptlrpc_cleanup_request_buf(req); + up(&cl->cli_rpc_sem); + RETURN(-rc); + } CDEBUG(D_OTHER, "-- sleeping\n"); wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req)); CDEBUG(D_OTHER, "-- done\n"); - - if (req->rq_flags == PTL_RPC_INTR) { + ptlrpc_cleanup_request_buf(req); + up(&cl->cli_rpc_sem); + if (req->rq_flags == PTL_RPC_INTR) { /* Clean up the dangling reply buffers */ ptlrpc_abort(req); - EXIT; - return -EINTR; + GOTO(out, rc = -EINTR); } - if (req->rq_flags != PTL_RPC_REPLY) { + if (req->rq_flags != PTL_RPC_REPLY) { CERROR("Unknown reason for wakeup\n"); /* XXX Phil - I end up here when I kill obdctl */ - ptlrpc_abort(req); - //BUG(); - EXIT; - return -EINTR; + ptlrpc_abort(req); + GOTO(out, rc = -EINTR); } - rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, - &req->rq_rephdr, &req->rq_rep); - if (rc) { - CERROR("unpack_rep failed: %d\n", rc); - return rc; - } - CDEBUG(D_NET, "got rep %d\n", req->rq_rephdr->xid); + rc = lustre_unpack_msg(req->rq_repbuf, req->rq_replen); + req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + if (rc) { + CERROR("unpack_rep failed: %d\n", rc); + GOTO(out, rc); + } + CDEBUG(D_NET, "got rep %d\n", req->rq_repmsg->xid); - if ( req->rq_rephdr->status == 0 ) + if (req->rq_repmsg->status == 0) CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repbuf, - req->rq_replen, req->rq_rephdr->status); + req->rq_replen, req->rq_repmsg->status); - EXIT; - return 0; + EXIT; + out: + return rc; }