Whamcloud - gitweb
- zero out the request structure after allocation
[fs/lustre-release.git] / lustre / ptlrpc / rpc.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define EXPORT_SYMTAB
24
25 #include <linux/config.h>
26 #include <linux/module.h>
27 #include <linux/kernel.h>
28
29 #include <linux/obd_support.h>
30 #include <linux/lustre_net.h>
31
32 static ptl_handle_eq_t req_eq, bulk_source_eq, bulk_sink_eq;
33
34 static int request_callback(ptl_event_t *ev, void *data)
35 {
36         struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
37
38         ENTRY;
39
40         if (ev->type == PTL_EVENT_SENT) {
41                 kfree(ev->mem_desc.start);
42         } else if (ev->type == PTL_EVENT_PUT) {
43                 rpc->rq_repbuf = ev->mem_desc.start + ev->offset;
44                 wake_up_interruptible(&rpc->rq_wait_for_rep);
45         }
46
47         EXIT;
48         return 1;
49 }
50
51 static int incoming_callback(ptl_event_t *ev, void *data)
52 {
53         struct ptlrpc_service *service = data;
54
55         ENTRY;
56
57         if (ev->type == PTL_EVENT_PUT) {
58                 wake_up(service->srv_wait_queue);
59         } else {
60                 printk("Unexpected event type: %d\n", ev->type);
61         }
62
63         EXIT;
64         return 0;
65 }
66
67 static int bulk_source_callback(ptl_event_t *ev, void *data)
68 {
69         struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
70
71         ENTRY;
72
73         if (ev->type == PTL_EVENT_SENT) {
74                 ;
75         } else if (ev->type == PTL_EVENT_ACK) {
76                 wake_up_interruptible(&rpc->rq_wait_for_bulk);
77         } else {
78                 printk("Unexpected event type in " __FUNCTION__ "!\n");
79         }
80
81         EXIT;
82         return 1;
83 }
84
85 static int bulk_sink_callback(ptl_event_t *ev, void *data)
86 {
87         struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
88
89         ENTRY;
90
91         if (ev->type == PTL_EVENT_PUT) {
92                 if (rpc->rq_bulkbuf != ev->mem_desc.start + ev->offset)
93                         printk(__FUNCTION__ ": bulkbuf != mem_desc -- why?\n");
94                 wake_up_interruptible(&rpc->rq_wait_for_bulk);
95         } else {
96                 printk("Unexpected event type in " __FUNCTION__ "!\n");
97         }
98
99         EXIT;
100         return 1;
101 }
102
103 int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
104                  int portal, int is_request)
105 {
106         int rc;
107         ptl_process_id_t remote_id;
108         ptl_handle_md_t md_h;
109
110         /* FIXME: This is bad. */
111         if (request->rq_bulklen) {
112                 request->rq_req_md.start = request->rq_bulkbuf;
113                 request->rq_req_md.length = request->rq_bulklen;
114                 request->rq_req_md.eventq = bulk_source_eq;
115         } else if (is_request) {
116                 request->rq_req_md.start = request->rq_reqbuf;
117                 request->rq_req_md.length = request->rq_reqlen;
118                 request->rq_req_md.eventq = req_eq;
119         } else {
120                 request->rq_req_md.start = request->rq_repbuf;
121                 request->rq_req_md.length = request->rq_replen;
122                 request->rq_req_md.eventq = req_eq;
123         }
124         request->rq_req_md.threshold = 1;
125         request->rq_req_md.options = PTL_MD_OP_PUT;
126         request->rq_req_md.user_ptr = request;
127
128         rc = PtlMDBind(peer->peer_ni, request->rq_req_md, &md_h);
129         if (rc != 0) {
130                 printk(__FUNCTION__ ": PtlMDBind failed: %d\n", rc);
131                 return rc;
132         }
133
134         remote_id.addr_kind = PTL_ADDR_NID;
135         remote_id.nid = peer->peer_nid;
136         remote_id.pid = 0;
137
138         if (request->rq_bulklen) {
139                 rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0,
140                             request->rq_xid, 0, 0);
141         } else {
142                 rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0,
143                             request->rq_xid, 0, 0);
144         }
145         if (rc != PTL_OK) {
146                 printk(__FUNCTION__ ": PtlPut failed: %d\n", rc);
147                 /* FIXME: tear down md */
148         }
149
150         return rc;
151 }
152
153 int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
154 {
155         ptl_handle_me_t me_h, bulk_me_h;
156         ptl_process_id_t local_id;
157         int rc;
158
159         ENTRY;
160
161         request->rq_repbuf = kmalloc(request->rq_replen, GFP_KERNEL); 
162         if (!request->rq_repbuf) { 
163                 EXIT;
164                 return -ENOMEM;
165         }
166
167         local_id.addr_kind = PTL_ADDR_GID;
168         local_id.gid = PTL_ID_ANY;
169         local_id.rid = PTL_ID_ANY;
170
171         rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
172                          request->rq_xid, 0, PTL_UNLINK, &me_h);
173         if (rc != PTL_OK) {
174                 EXIT;
175                 /* FIXME: tear down EQ, free reqbuf */
176                 return rc;
177         }
178
179         request->rq_reply_md.start = request->rq_repbuf;
180         request->rq_reply_md.length = request->rq_replen;
181         request->rq_reply_md.threshold = 1;
182         request->rq_reply_md.options = PTL_MD_OP_PUT;
183         request->rq_reply_md.user_ptr = request;
184         request->rq_reply_md.eventq = req_eq;
185
186         rc = PtlMDAttach(me_h, request->rq_reply_md, PTL_UNLINK,
187                          &request->rq_reply_md_h);
188         if (rc != PTL_OK) {
189                 EXIT;
190                 return rc;
191         }
192
193         if (request->rq_bulklen != 0) {
194                 rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal,
195                                  local_id, request->rq_xid, 0, PTL_UNLINK,
196                                  &bulk_me_h);
197                 if (rc != PTL_OK) {
198                         EXIT;
199                         return rc;
200                 }
201
202                 request->rq_bulk_md.start = request->rq_bulkbuf;
203                 request->rq_bulk_md.length = request->rq_bulklen;
204                 request->rq_bulk_md.threshold = 1;
205                 request->rq_bulk_md.options = PTL_MD_OP_PUT;
206                 request->rq_bulk_md.user_ptr = request;
207                 request->rq_bulk_md.eventq = bulk_sink_eq;
208
209                 rc = PtlMDAttach(bulk_me_h, request->rq_bulk_md, PTL_UNLINK,
210                                  &request->rq_bulk_md_h);
211                 if (rc != PTL_OK) {
212                         EXIT;
213                         return rc;
214                 }
215         }
216
217         return ptl_send_buf(request, peer, request->rq_req_portal, 1);
218 }
219
220 int rpc_register_service(struct ptlrpc_service *service, char *uuid)
221 {
222         struct lustre_peer peer;
223         int rc;
224
225         rc = kportal_uuid_to_peer(uuid, &peer);
226         if (rc != 0) {
227                 printk("Invalid uuid \"%s\"\n", uuid);
228                 return -EINVAL;
229         }
230
231         service->srv_buf = kmalloc(service->srv_buf_size, GFP_KERNEL);
232         if (service->srv_buf == NULL) {
233                 printk(__FUNCTION__ ": no memory\n");
234                 return -ENOMEM;
235         }
236
237         service->srv_id.addr_kind = PTL_ADDR_GID;
238         service->srv_id.gid = PTL_ID_ANY;
239         service->srv_id.rid = PTL_ID_ANY;
240
241         rc = PtlMEAttach(peer.peer_ni, service->srv_portal, service->srv_id,
242                          0, ~0, PTL_RETAIN, &service->srv_me);
243         if (rc != PTL_OK) {
244                 printk("PtlMEAttach failed: %d\n", rc);
245                 return rc;
246         }
247
248         rc = PtlEQAlloc(peer.peer_ni, 128, incoming_callback, service,
249                         &service->srv_eq);
250         if (rc != PTL_OK) {
251                 printk("PtlEQAlloc failed: %d\n", rc);
252                 return rc;
253         }
254
255         /* FIXME: Build an auto-unlinking MD and build a ring. */
256         /* FIXME: Make sure that these are reachable by DMA on well-known
257          * addresses. */
258         service->srv_md.start           = service->srv_buf;
259         service->srv_md.length          = service->srv_buf_size;
260         service->srv_md.threshold       = PTL_MD_THRESH_INF;
261         service->srv_md.options         = PTL_MD_OP_PUT;
262         service->srv_md.user_ptr        = service;
263         service->srv_md.eventq          = service->srv_eq;
264
265         rc = PtlMDAttach(service->srv_me, service->srv_md,
266                          PTL_RETAIN, &service->srv_md_h);
267         if (rc != PTL_OK) {
268                 printk("PtlMDAttach failed: %d\n", rc);
269                 /* FIXME: wow, we need to clean up. */
270                 return rc;
271         }
272
273         return 0;
274 }
275
276 static int req_init_portals(void)
277 {
278         int rc;
279         const ptl_handle_ni_t *nip;
280         ptl_handle_ni_t ni;
281
282         nip = inter_module_get_request(LUSTRE_NAL "_ni", LUSTRE_NAL);
283         if (nip == NULL) {
284                 printk("get_ni failed: is the NAL module loaded?\n");
285                 return -EIO;
286         }
287         ni = *nip;
288
289         rc = PtlEQAlloc(ni, 128, request_callback, NULL, &req_eq);
290         if (rc != PTL_OK)
291                 printk("PtlEQAlloc failed: %d\n", rc);
292
293         rc = PtlEQAlloc(ni, 128, bulk_source_callback, NULL, &bulk_source_eq);
294         if (rc != PTL_OK)
295                 printk("PtlEQAlloc failed: %d\n", rc);
296
297         rc = PtlEQAlloc(ni, 128, bulk_sink_callback, NULL, &bulk_sink_eq);
298         if (rc != PTL_OK)
299                 printk("PtlEQAlloc failed: %d\n", rc);
300
301         return rc;
302 }
303
304 static int __init req_init(void)
305 {
306         return req_init_portals();
307 }
308
309 static void __exit req_exit(void)
310 {
311         PtlEQFree(req_eq);
312
313         inter_module_put(LUSTRE_NAL "_ni");
314
315         return;
316 }
317
318 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
319 MODULE_DESCRIPTION("Lustre Request Processor v1.0");
320 MODULE_LICENSE("GPL"); 
321
322 module_init(req_init);
323 module_exit(req_exit);