Whamcloud - gitweb
new files - restructure RPC further
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define EXPORT_SYMTAB
24
25 #include <linux/config.h>
26 #include <linux/module.h>
27 #include <linux/kernel.h>
28
29 #define DEBUG_SUBSYSTEM S_RPC
30
31 #include <linux/obd_support.h>
32 #include <linux/obd_class.h>
33 #include <linux/lustre_net.h>
34
35 int ptlrpc_enqueue(struct ptlrpc_client *peer, 
36                      struct ptlrpc_request *req)
37 {
38         struct ptlrpc_request *srv_req;
39         
40         if (!peer->cli_obd) { 
41                 EXIT;
42                 return -1;
43         }
44
45         OBD_ALLOC(srv_req, sizeof(*srv_req));
46         if (!srv_req) { 
47                 EXIT;
48                 return -ENOMEM;
49         }
50
51         CDEBUG(0, "peer obd minor %d, incoming req %p, srv_req %p\n",
52                peer->cli_obd->obd_minor, req, srv_req);
53
54         memset(srv_req, 0, sizeof(*req)); 
55
56         /* move the request buffer */
57         srv_req->rq_reqbuf = req->rq_reqbuf;
58         srv_req->rq_reqlen = req->rq_reqlen;
59         srv_req->rq_obd = peer->cli_obd;
60
61         /* remember where it came from */
62         srv_req->rq_reply_handle = req;
63
64         list_add(&srv_req->rq_list, &peer->cli_obd->obd_req_list); 
65         wake_up(&peer->cli_obd->obd_req_waitq);
66         return 0;
67 }
68
69 int ptlrpc_connect_client(int dev, char *uuid, 
70                               struct ptlrpc_client *cl)
71 {
72         int err; 
73
74         memset(cl, 0, sizeof(*cl));
75         cl->cli_xid = 0;
76         cl->cli_obd = NULL; 
77
78         /* non networked client */
79         if (dev >= 0 && dev < MAX_OBD_DEVICES) {
80                 struct obd_device *obd = &obd_dev[dev];
81                 
82                 if ((!obd->obd_flags & OBD_ATTACHED) ||
83                     (!obd->obd_flags & OBD_SET_UP)) { 
84                         CERROR("target device %d not att or setup\n", dev);
85                         return -EINVAL;
86                 }
87
88                 cl->cli_obd = &obd_dev[dev];
89                 return 0;
90         }
91
92         /* networked */
93         err = kportal_uuid_to_peer(uuid, &cl->cli_server);
94         if (err == 0) { 
95                 CERROR("cannot find peer!"); 
96         }
97
98         return err;
99 }
100
101 struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, 
102                                        int opcode, int namelen, char *name,
103                                        int tgtlen, char *tgt)
104 {
105         struct ptlrpc_request *request;
106         int rc;
107         ENTRY; 
108
109         OBD_ALLOC(request, sizeof(*request));
110         if (!request) { 
111                 CERROR("request allocation out of memory\n");
112                 return NULL;
113         }
114
115         memset(request, 0, sizeof(*request));
116         request->rq_xid = cl->cli_xid++;
117
118         rc = cl->cli_req_pack(name, namelen, tgt, tgtlen,
119                           &request->rq_reqhdr, &request->rq_req,
120                           &request->rq_reqlen, &request->rq_reqbuf);
121         if (rc) { 
122                 CERROR("cannot pack request %d\n", rc); 
123                 return NULL;
124         }
125         request->rq_reqhdr->opc = opcode;
126         request->rq_reqhdr->seqno = request->rq_xid;
127
128         EXIT;
129         return request;
130 }
131
132 void ptlrpc_free_req(struct ptlrpc_request *request)
133 {
134         OBD_FREE(request, sizeof(*request));
135 }
136
137 int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
138                              
139 {
140         int rc;
141         DECLARE_WAITQUEUE(wait, current);
142
143         init_waitqueue_head(&req->rq_wait_for_rep);
144
145         if (cl->cli_obd) {
146                 /* Local delivery */
147                 ENTRY;
148                 rc = ptlrpc_enqueue(cl, req); 
149         } else {
150                 /* Remote delivery via portals. */
151                 req->rq_req_portal = cl->cli_request_portal;
152                 req->rq_reply_portal = cl->cli_reply_portal;
153                 rc = ptl_send_rpc(req, &cl->cli_server);
154         }
155         if (rc) { 
156                 CERROR("error %d, opcode %d\n", rc, 
157                        req->rq_reqhdr->opc); 
158                 return -rc;
159         }
160
161         CDEBUG(0, "-- sleeping\n");
162         add_wait_queue(&req->rq_wait_for_rep, &wait);
163         while (req->rq_repbuf == NULL) {
164                 set_current_state(TASK_INTERRUPTIBLE);
165
166                 /* if this process really wants to die, let it go */
167                 if (sigismember(&(current->pending.signal), SIGKILL) ||
168                     sigismember(&(current->pending.signal), SIGINT))
169                         break;
170
171                 schedule();
172         }
173         remove_wait_queue(&req->rq_wait_for_rep, &wait);
174         set_current_state(TASK_RUNNING);
175         CDEBUG(0, "-- done\n");
176
177         if (req->rq_repbuf == NULL) {
178                 /* We broke out because of a signal */
179                 EXIT;
180                 return -EINTR;
181         }
182
183         rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, &req->rq_rep);
184         if (rc) {
185                 CERROR("unpack_rep failed: %d\n", rc);
186                 return rc;
187         }
188         CERROR("got rep %lld\n", req->rq_rephdr->seqno);
189         if ( req->rq_rephdr->status == 0 )
190                 CDEBUG(0, "--> buf %p len %d status %d\n",
191                        req->rq_repbuf, req->rq_replen, 
192                        req->rq_rephdr->status); 
193
194         EXIT;
195         return 0;
196 }