Whamcloud - gitweb
rpc/service.c: set max_offset when creating service MDs
[fs/lustre-release.git] / lustre / ptlrpc / service.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define EXPORT_SYMTAB
24
25 #include <linux/config.h>
26 #include <linux/module.h>
27 #include <linux/kernel.h>
28
29 #define DEBUG_SUBSYSTEM S_RPC
30
31 #include <linux/obd_support.h>
32 #include <linux/obd_class.h>
33 #include <linux/lustre_net.h>
34
35 extern int server_request_callback(ptl_event_t *ev, void *data);
36
37 static int ptlrpc_check_event(struct ptlrpc_service *svc)
38 {
39         
40         if (sigismember(&(current->pending.signal), SIGKILL) ||
41             sigismember(&(current->pending.signal), SIGINT)) { 
42                 svc->srv_flags |= SVC_KILLED;
43                 EXIT;
44                 return 1;
45         }
46
47         if ( svc->srv_flags & SVC_STOPPING ) {
48                 EXIT;
49                 return 1;
50         }
51
52         if ( svc->srv_eq_h ) { 
53                 int rc;
54                 rc = PtlEQGet(svc->srv_eq_h, &svc->srv_ev);
55
56                 if (rc == PTL_OK) { 
57                         svc->srv_flags |= SVC_EVENT;
58                         EXIT;
59                         return 1;
60                 }
61
62                 if (rc == PTL_EQ_DROPPED) { 
63                         CERROR("dropped event!\n");
64                         BUG();
65                 }
66                 CERROR("PtlEQGet returns %d\n", rc); 
67                 EXIT;
68                 return 0;
69         }
70
71         if (!list_empty(&svc->srv_reqs)) {
72                 svc->srv_flags |= SVC_LIST;
73                 EXIT;
74                 return 1;
75         }
76                 
77         EXIT;
78         return 0;
79 }
80
81 struct ptlrpc_service *
82 ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid,
83                 req_unpack_t unpack, rep_pack_t pack, svc_handler_t handler)
84 {
85         int err;
86         struct ptlrpc_service *svc;
87
88         OBD_ALLOC(svc, sizeof(*svc)); 
89         if ( !svc ) { 
90                 CERROR("no memory\n");
91                 return NULL;
92         }
93
94         memset(svc, 0, sizeof(*svc)); 
95
96         spin_lock_init(&svc->srv_lock);
97         INIT_LIST_HEAD(&svc->srv_reqs);
98         init_waitqueue_head(&svc->srv_ctl_waitq); 
99         init_waitqueue_head(&svc->srv_waitq); 
100
101         svc->srv_thread = NULL;
102         svc->srv_flags = 0;
103
104         svc->srv_buf_size = bufsize;
105         svc->srv_rep_portal = rep_portal;
106         svc->srv_req_portal = req_portal;
107         svc->srv_req_unpack = unpack;
108         svc->srv_rep_pack = pack;
109         svc->srv_handler = handler;
110         err = kportal_uuid_to_peer(uuid, &svc->srv_self);
111         if (err) { 
112                 CERROR("cannot get peer for uuid %s", uuid); 
113                 OBD_FREE(svc, sizeof(*svc));
114                 return NULL; 
115         }
116         return svc;
117 }
118
119 static int ptlrpc_main(void *arg)
120 {
121         int rc;
122         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
123         struct obd_device *obddev = data->dev;
124         struct ptlrpc_service *svc = data->svc;
125
126         ENTRY;
127
128         lock_kernel();
129         daemonize();
130         spin_lock_irq(&current->sigmask_lock);
131         sigfillset(&current->blocked);
132         recalc_sigpending(current);
133         spin_unlock_irq(&current->sigmask_lock);
134
135         sprintf(current->comm, data->name);
136
137         /* Record that the  thread is running */
138         svc->srv_thread = current;
139         svc->srv_flags = SVC_RUNNING;
140         wake_up(&svc->srv_ctl_waitq); 
141
142         /* XXX maintain a list of all managed devices: insert here */
143
144         /* And now, loop forever on requests */
145         while (1) {
146
147                 wait_event(svc->srv_waitq, ptlrpc_check_event(svc));
148                 
149                 if (svc->srv_flags & SVC_SIGNAL) {
150                         EXIT;
151                         break;
152                 }
153
154                 if (svc->srv_flags & SVC_STOPPING) {
155                         EXIT;
156                         break;
157                 }
158
159                 if (svc->srv_flags & SVC_EVENT) { 
160                         struct ptlrpc_request request;
161                         svc->srv_flags = SVC_RUNNING; 
162
163                         /* FIXME: If we move to an event-driven model,
164                          * we should put the request on the stack of
165                          * mds_handle instead. */
166                         memset(&request, 0, sizeof(request));
167                         request.rq_obd = obddev;
168                         request.rq_reqbuf = svc->srv_ev.mem_desc.start + svc->srv_ev.offset;
169                         request.rq_reqlen = svc->srv_ev.mem_desc.length;
170                         request.rq_xid = svc->srv_ev.match_bits;
171                         CERROR("got req %d\n", request.rq_xid);
172
173                         request.rq_peer.peer_nid = svc->srv_ev.initiator.nid;
174                         /* FIXME: this NI should be the incoming NI.
175                          * We don't know how to find that from here. */
176                         request.rq_peer.peer_ni = svc->srv_self.peer_ni;
177                         rc = svc->srv_handler(obddev, svc, &request);
178                         ptl_received_rpc(svc);
179                         continue;
180                 }
181
182                 if (svc->srv_flags & SVC_LIST) { 
183                         struct ptlrpc_request *request;
184                         svc->srv_flags = SVC_RUNNING; 
185
186                         spin_lock(&svc->srv_lock);
187                         request = list_entry(svc->srv_reqs.next,
188                                              struct ptlrpc_request,
189                                              rq_list);
190                         list_del(&request->rq_list);
191                         spin_unlock(&svc->srv_lock);
192                         rc = svc->srv_handler(obddev, svc, request);
193                         continue;
194                 }
195                 CERROR("unknown break in service"); 
196                 break; 
197         }
198
199         svc->srv_thread = NULL;
200         svc->srv_flags = SVC_STOPPED;
201         wake_up(&svc->srv_ctl_waitq);
202         CERROR("svc exiting process %d\n", current->pid);
203         return 0;
204 }
205
206 void ptlrpc_stop_thread(struct ptlrpc_service *svc)
207 {
208         svc->srv_flags = SVC_STOPPING;
209
210         wake_up(&svc->srv_waitq);
211         wait_event_interruptible(svc->srv_ctl_waitq, 
212                                  (svc->srv_flags & SVC_STOPPED));
213 }
214
215 int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
216                                 char *name)
217 {
218         struct ptlrpc_svc_data d;
219         int rc;
220         ENTRY;
221
222         d.dev = dev;
223         d.svc = svc;
224         d.name = name;
225
226         init_waitqueue_head(&svc->srv_waitq);
227
228         init_waitqueue_head(&svc->srv_ctl_waitq);
229         rc = kernel_thread(ptlrpc_main, (void *) &d, 
230                            CLONE_VM | CLONE_FS | CLONE_FILES);
231         if (rc < 0) { 
232                 CERROR("cannot start thread\n"); 
233                 return -EINVAL;
234         }
235         wait_event(svc->srv_ctl_waitq, svc->srv_flags & SVC_RUNNING);
236
237         EXIT;
238         return 0;
239 }
240
241
242 int rpc_register_service(struct ptlrpc_service *service, char *uuid)
243 {
244         struct lustre_peer peer;
245         int rc, i;
246
247         rc = kportal_uuid_to_peer(uuid, &peer);
248         if (rc != 0) {
249                 CERROR("Invalid uuid \"%s\"\n", uuid);
250                 return -EINVAL;
251         }
252
253         service->srv_ring_length = RPC_RING_LENGTH;
254         service->srv_me_active = 0;
255         service->srv_md_active = 0;
256
257         service->srv_id.addr_kind = PTL_ADDR_GID;
258         service->srv_id.gid = PTL_ID_ANY;
259         service->srv_id.rid = PTL_ID_ANY;
260
261         rc = PtlEQAlloc(peer.peer_ni, 128, server_request_callback,
262                         service, &(service->srv_eq_h));
263
264         if (rc != PTL_OK) {
265                 CERROR("PtlEQAlloc failed: %d\n", rc);
266                 return rc;
267         }
268
269         /* Attach the leading ME on which we build the ring */
270         rc = PtlMEAttach(peer.peer_ni, service->srv_req_portal,
271                          service->srv_id, 0, ~0, PTL_RETAIN,
272                          &(service->srv_me_h[0]));
273
274         if (rc != PTL_OK) {
275                 CERROR("PtlMEAttach failed: %d\n", rc);
276                 return rc;
277         }
278
279         for (i = 0; i < service->srv_ring_length; i++) {
280                 OBD_ALLOC(service->srv_buf[i], service->srv_buf_size);
281
282                 if (service->srv_buf[i] == NULL) {
283                         CERROR("no memory\n");
284                         return -ENOMEM;
285                 }
286
287                 /* Insert additional ME's to the ring */
288                 if (i > 0) {
289                         rc = PtlMEInsert(service->srv_me_h[i-1],
290                                          service->srv_id, 0, ~0, PTL_RETAIN,
291                                          PTL_INS_AFTER,&(service->srv_me_h[i]));
292                         service->srv_me_tail = i;
293
294                         if (rc != PTL_OK) {
295                                 CERROR("PtlMEInsert failed: %d\n", rc);
296                                 return rc;
297                         }
298                 }
299
300                 service->srv_ref_count[i] = 0;
301                 service->srv_md[i].start         = service->srv_buf[i];
302                 service->srv_md[i].length        = service->srv_buf_size;
303                 service->srv_md[i].max_offset    = service->srv_buf_size;
304                 service->srv_md[i].threshold     = PTL_MD_THRESH_INF;
305                 service->srv_md[i].options       = PTL_MD_OP_PUT;
306                 service->srv_md[i].user_ptr      = service;
307                 service->srv_md[i].eventq        = service->srv_eq_h;
308                 service->srv_md[i].max_offset    = service->srv_buf_size;
309
310                 rc = PtlMDAttach(service->srv_me_h[i], service->srv_md[i],
311                                  PTL_RETAIN, &(service->srv_md_h[i]));
312
313                 if (rc != PTL_OK) {
314                         /* cleanup */
315                         CERROR("PtlMDAttach failed: %d\n", rc);
316                         return rc;
317                 }
318         }
319
320         return 0;
321 }
322
323 int rpc_unregister_service(struct ptlrpc_service *service)
324 {
325         int rc, i;
326
327         for (i = 0; i < service->srv_ring_length; i++) {
328                 rc = PtlMDUnlink(service->srv_md_h[i]);
329                 if (rc)
330                         CERROR("PtlMDUnlink failed: %d\n", rc);
331         
332                 rc = PtlMEUnlink(service->srv_me_h[i]);
333                 if (rc)
334                         CERROR("PtlMEUnlink failed: %d\n", rc);
335
336                 if (service->srv_buf[i] != NULL)
337                         OBD_FREE(service->srv_buf[i], service->srv_buf_size);
338                 service->srv_buf[i] = NULL;
339         }
340
341         rc = PtlEQFree(service->srv_eq_h);
342         if (rc)
343                 CERROR("PtlEQFree failed: %d\n", rc);
344
345         return 0;
346 }
347