Whamcloud - gitweb
- merge 0.7rc1 from b_devel to HEAD (20030612 merge point)
[fs/lustre-release.git] / lustre / ptlrpc / service.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <liblustre.h>
26 #include <linux/kp30.h>
27 #endif
28 #include <linux/obd_support.h>
29 #include <linux/obd_class.h>
30 #include <linux/lustre_net.h>
31 #include <portals/types.h>
32 #include "ptlrpc_internal.h"
33
34 extern int request_in_callback(ptl_event_t *ev);
35
36 static int ptlrpc_check_event(struct ptlrpc_service *svc,
37                               struct ptlrpc_thread *thread, ptl_event_t *event)
38 {
39         struct ptlrpc_srv_ni *srv_ni;
40         int i;
41         int idx;
42         int rc;
43         ENTRY;
44
45         spin_lock(&svc->srv_lock);
46
47         if (thread->t_flags & SVC_STOPPING)
48                 GOTO(out, rc = 1);
49
50         LASSERT ((thread->t_flags & SVC_EVENT) == 0);
51         LASSERT (ptlrpc_ninterfaces > 0);
52
53         for (i = 0; i < ptlrpc_ninterfaces; i++) {
54                 idx = (svc->srv_interface_rover + i) % ptlrpc_ninterfaces;
55                 srv_ni = &svc->srv_interfaces[idx];
56
57                 LASSERT (!PtlHandleEqual (srv_ni->sni_eq_h, PTL_HANDLE_NONE));
58
59                 rc = PtlEQGet(srv_ni->sni_eq_h, event);
60                 switch (rc) {
61                 case PTL_OK:
62                         /* next time start with the next interface */
63                         svc->srv_interface_rover = (idx+1) % ptlrpc_ninterfaces;
64                         thread->t_flags |= SVC_EVENT;
65                         GOTO(out, rc = 1);
66
67                 case PTL_EQ_EMPTY:
68                         continue;
69
70                 default:
71                         CERROR("BUG: PtlEQGet returned %d\n", rc);
72                         LBUG();
73                 }
74         }
75         rc = 0;
76         EXIT;
77  out:
78         spin_unlock(&svc->srv_lock);
79         return rc;
80 }
81
82 struct ptlrpc_service *
83 ptlrpc_init_svc(__u32 nevents, __u32 nbufs,
84                 __u32 bufsize, __u32 max_req_size,
85                 int req_portal, int rep_portal,
86                 svc_handler_t handler, char *name,
87                 struct obd_device *obddev)
88 {
89         int i, j, ssize, rc;
90         struct ptlrpc_service *service;
91         struct ptlrpc_srv_ni  *srv_ni;
92         ENTRY;
93
94         LASSERT (ptlrpc_ninterfaces > 0);
95
96         ssize = offsetof (struct ptlrpc_service,
97                           srv_interfaces[ptlrpc_ninterfaces]);
98         OBD_ALLOC(service, ssize);
99         if (service == NULL)
100                 RETURN(NULL);
101
102         service->srv_name = name;
103         spin_lock_init(&service->srv_lock);
104         INIT_LIST_HEAD(&service->srv_threads);
105         init_waitqueue_head(&service->srv_waitq);
106
107         service->srv_max_req_size = max_req_size;
108         service->srv_buf_size = bufsize;
109
110         service->srv_rep_portal = rep_portal;
111         service->srv_req_portal = req_portal;
112         service->srv_handler = handler;
113         service->srv_interface_rover = 0;
114
115         /* First initialise enough for early teardown */
116         for (i = 0; i < ptlrpc_ninterfaces; i++) {
117                 srv_ni = &service->srv_interfaces[i];
118
119                 srv_ni->sni_service = service;
120                 srv_ni->sni_ni = &ptlrpc_interfaces[i];
121                 srv_ni->sni_eq_h = PTL_HANDLE_NONE;
122                 INIT_LIST_HEAD(&srv_ni->sni_rqbds);
123                 srv_ni->sni_nrqbds = 0;
124                 atomic_set(&srv_ni->sni_nrqbds_receiving, 0);
125         }
126
127         /* Now allocate the event queue and request buffers, assuming all
128          * interfaces require the same level of buffering. */
129         for (i = 0; i < ptlrpc_ninterfaces; i++) {
130                 srv_ni = &service->srv_interfaces[i];
131                 CDEBUG (D_NET, "%s: initialising interface %s\n", name,
132                         srv_ni->sni_ni->pni_name);
133
134                 rc = PtlEQAlloc(srv_ni->sni_ni->pni_ni_h, nevents,
135                                 request_in_callback, &(srv_ni->sni_eq_h));
136                 if (rc != PTL_OK) {
137                         CERROR("%s.%d: PtlEQAlloc on %s failed: %d\n",
138                                name, i, srv_ni->sni_ni->pni_name, rc);
139                         GOTO (failed, NULL);
140                 }
141
142                 for (j = 0; j < nbufs; j++) {
143                         struct ptlrpc_request_buffer_desc *rqbd;
144
145                         OBD_ALLOC(rqbd, sizeof(*rqbd));
146                         if (rqbd == NULL) {
147                                 CERROR ("%s.%d: Can't allocate request "
148                                         "descriptor %d on %s\n",
149                                         name, i, srv_ni->sni_nrqbds,
150                                         srv_ni->sni_ni->pni_name);
151                                 GOTO(failed, NULL);
152                         }
153
154                         rqbd->rqbd_srv_ni = srv_ni;
155                         rqbd->rqbd_me_h = PTL_HANDLE_NONE;
156                         atomic_set(&rqbd->rqbd_refcount, 0);
157
158                         OBD_ALLOC(rqbd->rqbd_buffer, service->srv_buf_size);
159                         if (rqbd->rqbd_buffer == NULL) {
160                                 CERROR ("%s.%d: Can't allocate request "
161                                         "buffer %d on %s\n",
162                                         name, i, srv_ni->sni_nrqbds,
163                                         srv_ni->sni_ni->pni_name);
164                                 OBD_FREE(rqbd, sizeof(*rqbd));
165                                 GOTO(failed, NULL);
166                         }
167                         list_add(&rqbd->rqbd_list, &srv_ni->sni_rqbds);
168                         srv_ni->sni_nrqbds++;
169
170                         ptlrpc_link_svc_me(rqbd);
171                 }
172         }
173
174         ptlrpc_lprocfs_register_service(obddev, service);
175
176         CDEBUG(D_NET, "%s: Started on %d interfaces, listening on portal %d\n",
177                service->srv_name, ptlrpc_ninterfaces, service->srv_req_portal);
178
179         RETURN(service);
180 failed:
181         ptlrpc_unregister_service(service);
182         return NULL;
183 }
184
185 static int handle_incoming_request(struct obd_device *obddev,
186                                    struct ptlrpc_service *svc,
187                                    ptl_event_t *event,
188                                    struct ptlrpc_request *request)
189 {
190         struct ptlrpc_request_buffer_desc *rqbd = event->mem_desc.user_ptr;
191         int rc;
192
193         /* FIXME: If we move to an event-driven model, we should put the request
194          * on the stack of mds_handle instead. */
195
196         LASSERT (atomic_read (&rqbd->rqbd_refcount) > 0);
197         LASSERT ((event->mem_desc.options & (PTL_MD_IOV | PTL_MD_KIOV)) == 0);
198         LASSERT (rqbd->rqbd_srv_ni->sni_service == svc);
199         LASSERT (rqbd->rqbd_buffer == event->mem_desc.start);
200         LASSERT (event->offset + event->mlength <= svc->srv_buf_size);
201
202         memset(request, 0, sizeof(*request));
203         spin_lock_init (&request->rq_lock);
204         INIT_LIST_HEAD(&request->rq_list);
205         request->rq_svc = svc;
206         request->rq_obd = obddev;
207         request->rq_xid = event->match_bits;
208         request->rq_reqmsg = event->mem_desc.start + event->offset;
209         request->rq_reqlen = event->mlength;
210
211 #if SWAB_PARANOIA
212         /* Clear request swab mask; this is a new request */
213         request->rq_req_swab_mask = 0;
214 #endif
215         rc = lustre_unpack_msg (request->rq_reqmsg, request->rq_reqlen);
216         if (rc != 0) {
217                 CERROR ("error unpacking request: ptl %d from "LPX64
218                         " xid "LPU64"\n", svc->srv_req_portal,
219                        event->initiator.nid, request->rq_xid);
220                 goto out;
221         }
222         rc = -EINVAL;
223         if (request->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
224                 CERROR("wrong packet type received (type=%u)\n",
225                        request->rq_reqmsg->type);
226                 goto out;
227         }
228
229         CDEBUG(D_NET, "got req "LPD64" (md: %p + %d)\n", request->rq_xid,
230                event->mem_desc.start, event->offset);
231
232         request->rq_peer.peer_nid = event->initiator.nid;
233         request->rq_peer.peer_ni = rqbd->rqbd_srv_ni->sni_ni;
234
235         request->rq_export = class_conn2export(&request->rq_reqmsg->handle);
236
237         if (request->rq_export) {
238                 request->rq_connection = request->rq_export->exp_connection;
239                 ptlrpc_connection_addref(request->rq_connection);
240                 request->rq_export->exp_last_request_time =
241                         LTIME_S(CURRENT_TIME);
242         } else {
243                 /* create a (hopefully temporary) connection that will be used
244                  * to send the reply if this call doesn't create an export.
245                  * XXX revisit this when we revamp ptlrpc */
246                 request->rq_connection =
247                         ptlrpc_get_connection(&request->rq_peer, NULL);
248         }
249
250         CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid:pid:xid:ni:nid:opc %s:%s:%d:"
251                LPU64":%s:"LPX64":%d\n",
252                current->comm,
253                (request->rq_export ? 
254                 (char *)request->rq_export->exp_client_uuid.uuid : "0"), 
255                request->rq_reqmsg->status, request->rq_xid,
256                rqbd->rqbd_srv_ni->sni_ni->pni_name, event->initiator.nid,
257                request->rq_reqmsg->opc);
258
259         rc = svc->srv_handler(request);
260         CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid:pid:xid:ni:nid:opc %s:%s:%d:"
261                LPU64":%s:"LPX64":%d\n",
262                current->comm,
263                (request->rq_export ? 
264                 (char *)request->rq_export->exp_client_uuid.uuid : "0"),
265                request->rq_reqmsg->status, request->rq_xid,
266                rqbd->rqbd_srv_ni->sni_ni->pni_name, event->initiator.nid,
267                request->rq_reqmsg->opc);
268
269         ptlrpc_put_connection(request->rq_connection);
270         if (request->rq_export != NULL)
271                 class_export_put(request->rq_export);
272
273  out:
274         if (atomic_dec_and_test (&rqbd->rqbd_refcount)) /* last reference? */
275                 ptlrpc_link_svc_me (rqbd);
276
277         return rc;
278 }
279
280 /* Don't use daemonize, it removes fs struct from new thread (bug 418) */
281 void ptlrpc_daemonize(void)
282 {
283         exit_mm(current);
284
285         current->session = 1;
286         current->pgrp = 1;
287         current->tty = NULL;
288
289         exit_files(current);
290         reparent_to_init();
291 }
292
293 static int ptlrpc_main(void *arg)
294 {
295         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
296         struct obd_device *obddev = data->dev;
297         struct ptlrpc_service *svc = data->svc;
298         struct ptlrpc_thread *thread = data->thread;
299         struct ptlrpc_request *request;
300         ptl_event_t *event;
301         int rc = 0;
302         unsigned long flags;
303         cycles_t workdone_time;
304         cycles_t svc_workcycles;
305         ENTRY;
306
307         lock_kernel();
308         ptlrpc_daemonize();
309
310         SIGNAL_MASK_LOCK(current, flags);
311         sigfillset(&current->blocked);
312         RECALC_SIGPENDING;
313         SIGNAL_MASK_UNLOCK(current, flags);
314
315 #if defined(__arch_um__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,4,20))
316         sprintf(current->comm, "%s|%d", data->name,current->thread.extern_pid);
317 #elif defined(__arch_um__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
318         sprintf(current->comm, "%s|%d", data->name,
319                 current->thread.mode.tt.extern_pid);
320 #else
321         strcpy(current->comm, data->name);
322 #endif
323         unlock_kernel();
324
325         OBD_ALLOC(event, sizeof(*event));
326         if (!event)
327                 GOTO(out, rc = -ENOMEM);
328         OBD_ALLOC(request, sizeof(*request));
329         if (!request)
330                 GOTO(out_event, rc = -ENOMEM);
331
332         /* Record that the thread is running */
333         thread->t_flags = SVC_RUNNING;
334         svc_workcycles = workdone_time = 0;
335         wake_up(&thread->t_ctl_waitq);
336
337         /* XXX maintain a list of all managed devices: insert here */
338
339         /* And now, loop forever on requests */
340         while (1) {
341                 struct l_wait_info lwi = { 0 };
342                 l_wait_event(svc->srv_waitq,
343                              ptlrpc_check_event(svc, thread, event), &lwi);
344
345                 if (thread->t_flags & SVC_STOPPING) {
346                         spin_lock(&svc->srv_lock);
347                         thread->t_flags &= ~SVC_STOPPING;
348                         spin_unlock(&svc->srv_lock);
349
350                         EXIT;
351                         break;
352                 }
353
354                 if (thread->t_flags & SVC_EVENT) {
355                         cycles_t  workstart_time;
356                         spin_lock(&svc->srv_lock);
357                         thread->t_flags &= ~SVC_EVENT;
358                         /* Update Service Statistics */
359                         workstart_time = get_cycles();
360                         if (workdone_time && (svc->svc_counters != NULL)) {
361                                 /* Stats for req(n) are updated just before
362                                  * req(n+1) is executed. This avoids need to
363                                  * reacquire svc->srv_lock after
364                                  * call to handling_request().
365                                  */
366                                 int opc_offset;
367                                 /* req_waittime */
368                                 LPROCFS_COUNTER_INCR(&svc->svc_counters->cntr[PTLRPC_REQWAIT_CNTR],
369                                                      (workstart_time -
370                                                       event->arrival_time));
371                                 /* svc_eqdepth */
372                                 LPROCFS_COUNTER_INCR(&svc->svc_counters->cntr[PTLRPC_SVCEQDEPTH_CNTR],
373                                                      0); /* Wait for b_eq branch */
374                                 /* svc_idletime */
375                                 LPROCFS_COUNTER_INCR(&svc->svc_counters->cntr[PTLRPC_SVCIDLETIME_CNTR],
376                                                      (workstart_time -
377                                                       workdone_time));
378                                 /* previous request */
379                                 opc_offset = 
380                                         opcode_offset(request->rq_reqmsg->opc);
381                                 if (opc_offset >= 0) {
382                                         LASSERT(opc_offset < LUSTRE_MAX_OPCODES);
383                                         LPROCFS_COUNTER_INCR(&svc->svc_counters->cntr[PTLRPC_LAST_CNTR+opc_offset], svc_workcycles);
384                                 }
385                         }
386                         spin_unlock(&svc->srv_lock);
387
388                         rc = handle_incoming_request(obddev, svc, event,
389                                                      request);
390                         workdone_time = get_cycles();
391                         svc_workcycles = workdone_time - workstart_time;
392                         continue;
393                 }
394
395                 CERROR("unknown break in service");
396                 LBUG();
397                 EXIT;
398                 break;
399         }
400
401         /* NB should wait for all SENT callbacks to complete before exiting
402          * here.  Unfortunately at this time there is no way to track this
403          * state.
404          */
405         OBD_FREE(request, sizeof(*request));
406 out_event:
407         OBD_FREE(event, sizeof(*event));
408 out:
409         thread->t_flags = SVC_STOPPED;
410         wake_up(&thread->t_ctl_waitq);
411
412         CDEBUG(D_NET, "service thread exiting, process %d: rc = %d\n",
413                current->pid, rc);
414         return rc;
415 }
416
417 static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
418                                struct ptlrpc_thread *thread)
419 {
420         struct l_wait_info lwi = { 0 };
421
422         spin_lock(&svc->srv_lock);
423         thread->t_flags = SVC_STOPPING;
424         spin_unlock(&svc->srv_lock);
425
426         wake_up(&svc->srv_waitq);
427         l_wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED),
428                      &lwi);
429 }
430
431 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
432 {
433         spin_lock(&svc->srv_lock);
434         while (!list_empty(&svc->srv_threads)) {
435                 struct ptlrpc_thread *thread;
436                 thread = list_entry(svc->srv_threads.next, struct ptlrpc_thread,
437                                     t_link);
438                 spin_unlock(&svc->srv_lock);
439                 ptlrpc_stop_thread(svc, thread);
440                 spin_lock(&svc->srv_lock);
441                 list_del(&thread->t_link);
442                 OBD_FREE(thread, sizeof(*thread));
443         }
444         spin_unlock(&svc->srv_lock);
445 }
446
447 int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
448                         char *name)
449 {
450         struct l_wait_info lwi = { 0 };
451         struct ptlrpc_svc_data d;
452         struct ptlrpc_thread *thread;
453         int rc;
454         ENTRY;
455
456         OBD_ALLOC(thread, sizeof(*thread));
457         if (thread == NULL)
458                 RETURN(-ENOMEM);
459         init_waitqueue_head(&thread->t_ctl_waitq);
460
461         d.dev = dev;
462         d.svc = svc;
463         d.name = name;
464         d.thread = thread;
465
466         spin_lock(&svc->srv_lock);
467         list_add(&thread->t_link, &svc->srv_threads);
468         spin_unlock(&svc->srv_lock);
469
470         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
471          * just drop the VM and FILES in ptlrpc_daemonize() right away.
472          */
473         rc = kernel_thread(ptlrpc_main, &d, CLONE_VM | CLONE_FILES);
474         if (rc < 0) {
475                 CERROR("cannot start thread: %d\n", rc);
476                 OBD_FREE(thread, sizeof(*thread));
477                 RETURN(rc);
478         }
479         l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi);
480
481         RETURN(0);
482 }
483
484 int ptlrpc_unregister_service(struct ptlrpc_service *service)
485 {
486         int i, rc;
487         struct ptlrpc_srv_ni *srv_ni;
488
489         LASSERT (list_empty (&service->srv_threads));
490
491         /* XXX We could reply (with failure) to all buffered requests
492          * _after_ unlinking _all_ the request buffers, but _before_
493          * freeing them.
494          */
495
496         for (i = 0; i < ptlrpc_ninterfaces; i++) {
497                 srv_ni = &service->srv_interfaces[i];
498                 CDEBUG (D_NET, "%s: tearing down interface %s\n",
499                         service->srv_name, srv_ni->sni_ni->pni_name);
500
501                 while (!list_empty (&srv_ni->sni_rqbds)) {
502                         struct ptlrpc_request_buffer_desc *rqbd =
503                                 list_entry (srv_ni->sni_rqbds.next,
504                                             struct ptlrpc_request_buffer_desc,
505                                             rqbd_list);
506
507                         list_del (&rqbd->rqbd_list);
508
509                         LASSERT (atomic_read (&rqbd->rqbd_refcount) > 0);
510                         /* refcount could be anything; it's possible for
511                          * the buffers to continued to get filled after all
512                          * the server threads exited.  But we know they
513                          * _have_ exited.
514                          */
515
516                         (void) PtlMEUnlink(rqbd->rqbd_me_h);
517                         /* The callback handler could have unlinked this ME
518                          * already (we're racing with her) but it's safe to
519                          * ensure it _has_ been unlinked.
520                          */
521
522                         OBD_FREE (rqbd->rqbd_buffer, service->srv_buf_size);
523                         OBD_FREE (rqbd, sizeof (*rqbd));
524                         srv_ni->sni_nrqbds--;
525                 }
526
527                 LASSERT (srv_ni->sni_nrqbds == 0);
528
529                 if (!PtlHandleEqual (srv_ni->sni_eq_h, PTL_HANDLE_NONE)) {
530                         rc = PtlEQFree(srv_ni->sni_eq_h);
531                         if (rc)
532                                 CERROR("%s.%d: PtlEQFree failed on %s: %d\n",
533                                        service->srv_name, i,
534                                        srv_ni->sni_ni->pni_name, rc);
535                 }
536         }
537
538         ptlrpc_lprocfs_unregister_service(service);
539
540         OBD_FREE(service,
541                  offsetof (struct ptlrpc_service,
542                            srv_interfaces[ptlrpc_ninterfaces]));
543         return 0;
544 }