Whamcloud - gitweb
Land b1_2_smallfix onto b1_2 (20040610_1244)
[fs/lustre-release.git] / lustre / ptlrpc / service.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <liblustre.h>
26 #include <linux/kp30.h>
27 #endif
28 #include <linux/obd_support.h>
29 #include <linux/obd_class.h>
30 #include <linux/lustre_net.h>
31 #include <portals/types.h>
32 #include "ptlrpc_internal.h"
33
34 /* forward ref */
35 static int ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc);
36
37 static LIST_HEAD (ptlrpc_all_services);
38 static spinlock_t ptlrpc_all_services_lock = SPIN_LOCK_UNLOCKED;
39
40 static void
41 ptlrpc_free_server_req (struct ptlrpc_request *req)
42 {
43         /* The last request to be received into a request buffer uses space
44          * in the request buffer descriptor, otherwise requests are
45          * allocated dynamically in the incoming reply event handler */
46         if (req == &req->rq_rqbd->rqbd_req)
47                 return;
48
49         OBD_FREE(req, sizeof(*req));
50 }
51         
52 static char *
53 ptlrpc_alloc_request_buffer (int size)
54 {
55         char *ptr;
56         
57         if (size > SVC_BUF_VMALLOC_THRESHOLD)
58                 OBD_VMALLOC(ptr, size);
59         else
60                 OBD_ALLOC(ptr, size);
61         
62         return (ptr);
63 }
64
65 static void
66 ptlrpc_free_request_buffer (char *ptr, int size)
67 {
68         if (size > SVC_BUF_VMALLOC_THRESHOLD)
69                 OBD_VFREE(ptr, size);
70         else
71                 OBD_FREE(ptr, size);
72 }
73
74 struct ptlrpc_request_buffer_desc *
75 ptlrpc_alloc_rqbd (struct ptlrpc_srv_ni *srv_ni)
76 {
77         struct ptlrpc_service             *svc = srv_ni->sni_service;
78         unsigned long                      flags;
79         struct ptlrpc_request_buffer_desc *rqbd;
80
81         OBD_ALLOC(rqbd, sizeof (*rqbd));
82         if (rqbd == NULL)
83                 return (NULL);
84
85         rqbd->rqbd_srv_ni = srv_ni;
86         rqbd->rqbd_refcount = 0;
87         rqbd->rqbd_cbid.cbid_fn = request_in_callback;
88         rqbd->rqbd_cbid.cbid_arg = rqbd;
89         rqbd->rqbd_buffer = ptlrpc_alloc_request_buffer(svc->srv_buf_size);
90
91         if (rqbd->rqbd_buffer == NULL) {
92                 OBD_FREE(rqbd, sizeof (*rqbd));
93                 return (NULL);
94         }
95
96         spin_lock_irqsave (&svc->srv_lock, flags);
97         list_add(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
98         svc->srv_nbufs++;
99         spin_unlock_irqrestore (&svc->srv_lock, flags);
100
101         return (rqbd);
102 }
103
104 void
105 ptlrpc_free_rqbd (struct ptlrpc_request_buffer_desc *rqbd) 
106 {
107         struct ptlrpc_srv_ni  *sni = rqbd->rqbd_srv_ni;
108         struct ptlrpc_service *svc = sni->sni_service;
109         unsigned long          flags;
110         
111         LASSERT (rqbd->rqbd_refcount == 0);
112
113         spin_lock_irqsave(&svc->srv_lock, flags);
114         list_del(&rqbd->rqbd_list);
115         svc->srv_nbufs--;
116         spin_unlock_irqrestore(&svc->srv_lock, flags);
117
118         ptlrpc_free_request_buffer (rqbd->rqbd_buffer, svc->srv_buf_size);
119         OBD_FREE (rqbd, sizeof (*rqbd));
120 }
121
122 int
123 ptlrpc_grow_req_bufs(struct ptlrpc_srv_ni *srv_ni)
124 {
125         struct ptlrpc_service             *svc = srv_ni->sni_service;
126         struct ptlrpc_request_buffer_desc *rqbd;
127         int                                i;
128
129         for (i = 0; i < svc->srv_nbuf_per_group; i++) {
130                 rqbd = ptlrpc_alloc_rqbd(srv_ni);
131
132                 if (rqbd == NULL) {
133                         CERROR ("%s/%s: Can't allocate request buffer\n",
134                                 svc->srv_name, srv_ni->sni_ni->pni_name);
135                         return (-ENOMEM);
136                 }
137
138                 if (ptlrpc_server_post_idle_rqbds(svc) < 0)
139                         return (-EAGAIN);
140         }
141
142         return (0);
143 }
144
145 void
146 ptlrpc_save_lock (struct ptlrpc_request *req, 
147                   struct lustre_handle *lock, int mode)
148 {
149         struct ptlrpc_reply_state *rs = req->rq_reply_state;
150         int                        idx;
151
152         LASSERT (rs != NULL);
153         LASSERT (rs->rs_nlocks < RS_MAX_LOCKS);
154
155         idx = rs->rs_nlocks++;
156         rs->rs_locks[idx] = *lock;
157         rs->rs_modes[idx] = mode;
158         rs->rs_difficult = 1;
159 }
160
161 void
162 ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
163 {
164         struct ptlrpc_service *svc = rs->rs_srv_ni->sni_service;
165
166 #ifdef CONFIG_SMP
167         LASSERT (spin_is_locked (&svc->srv_lock));
168 #endif
169         LASSERT (rs->rs_difficult);
170         rs->rs_scheduled_ever = 1;              /* flag any notification attempt */
171
172         if (rs->rs_scheduled)                   /* being set up or already notified */
173                 return;
174
175         rs->rs_scheduled = 1;
176         list_del (&rs->rs_list);
177         list_add (&rs->rs_list, &svc->srv_reply_queue);
178         wake_up (&svc->srv_waitq);
179 }
180
181 void 
182 ptlrpc_commit_replies (struct obd_device *obd)
183 {
184         struct list_head   *tmp;
185         struct list_head   *nxt;
186         unsigned long       flags;
187         
188         /* Find any replies that have been committed and get their service
189          * to attend to complete them. */
190
191         /* CAVEAT EMPTOR: spinlock ordering!!! */
192         spin_lock_irqsave (&obd->obd_uncommitted_replies_lock, flags);
193
194         list_for_each_safe (tmp, nxt, &obd->obd_uncommitted_replies) {
195                 struct ptlrpc_reply_state *rs =
196                         list_entry (tmp, struct ptlrpc_reply_state, rs_obd_list);
197
198                 LASSERT (rs->rs_difficult);
199
200                 if (rs->rs_transno <= obd->obd_last_committed) {
201                         struct ptlrpc_service *svc = rs->rs_srv_ni->sni_service;
202
203                         spin_lock (&svc->srv_lock);
204                         list_del_init (&rs->rs_obd_list);
205                         ptlrpc_schedule_difficult_reply (rs);
206                         spin_unlock (&svc->srv_lock);
207                 }
208         }
209         
210         spin_unlock_irqrestore (&obd->obd_uncommitted_replies_lock, flags);
211 }
212
213 static long
214 timeval_sub(struct timeval *large, struct timeval *small)
215 {
216         return (large->tv_sec - small->tv_sec) * 1000000 +
217                 (large->tv_usec - small->tv_usec);
218 }
219
220 static int
221 ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc)
222 {
223         struct ptlrpc_srv_ni              *srv_ni;
224         struct ptlrpc_request_buffer_desc *rqbd;
225         unsigned long                      flags;
226         int                                rc;
227         int                                posted = 0;
228
229         for (;;) {
230                 spin_lock_irqsave(&svc->srv_lock, flags);
231
232                 if (list_empty (&svc->srv_idle_rqbds)) {
233                         spin_unlock_irqrestore(&svc->srv_lock, flags);
234                         return (posted);
235                 }
236
237                 rqbd = list_entry(svc->srv_idle_rqbds.next,
238                                   struct ptlrpc_request_buffer_desc,
239                                   rqbd_list);
240                 list_del (&rqbd->rqbd_list);
241
242                 /* assume we will post successfully */
243                 srv_ni = rqbd->rqbd_srv_ni;
244                 srv_ni->sni_nrqbd_receiving++;
245                 list_add (&rqbd->rqbd_list, &srv_ni->sni_active_rqbds);
246
247                 spin_unlock_irqrestore(&svc->srv_lock, flags);
248                 
249                 rc = ptlrpc_register_rqbd(rqbd);
250                 if (rc != 0)
251                         break;
252                 
253                 posted = 1;
254         }
255
256         spin_lock_irqsave(&svc->srv_lock, flags);
257
258         srv_ni->sni_nrqbd_receiving--;
259         list_del(&rqbd->rqbd_list);
260         list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
261
262         if (srv_ni->sni_nrqbd_receiving == 0) {
263                 /* This service is off-air on this interface because all
264                  * its request buffers are busy.  Portals will have started
265                  * dropping incoming requests until more buffers get
266                  * posted */
267                 CERROR("All %s %s request buffers busy\n",
268                        svc->srv_name, srv_ni->sni_ni->pni_name);
269         }
270
271         spin_unlock_irqrestore (&svc->srv_lock, flags);
272
273         return (-1);
274 }
275
276 struct ptlrpc_service *
277 ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
278                 int req_portal, int rep_portal, 
279                 svc_handler_t handler, char *name,
280                 struct proc_dir_entry *proc_entry)
281 {
282         int                                i;
283         int                                rc;
284         int                                ssize;
285         struct ptlrpc_service             *service;
286         struct ptlrpc_srv_ni              *srv_ni;
287         ENTRY;
288
289         LASSERT (ptlrpc_ninterfaces > 0);
290         LASSERT (nbufs > 0);
291         LASSERT (bufsize >= max_req_size);
292         
293         ssize = offsetof (struct ptlrpc_service,
294                           srv_interfaces[ptlrpc_ninterfaces]);
295         OBD_ALLOC(service, ssize);
296         if (service == NULL)
297                 RETURN(NULL);
298
299         service->srv_name = name;
300         spin_lock_init(&service->srv_lock);
301         INIT_LIST_HEAD(&service->srv_threads);
302         init_waitqueue_head(&service->srv_waitq);
303
304         service->srv_nbuf_per_group = nbufs;
305         service->srv_max_req_size = max_req_size;
306         service->srv_buf_size = bufsize;
307         service->srv_rep_portal = rep_portal;
308         service->srv_req_portal = req_portal;
309         service->srv_handler = handler;
310
311         INIT_LIST_HEAD(&service->srv_request_queue);
312         INIT_LIST_HEAD(&service->srv_idle_rqbds);
313         INIT_LIST_HEAD(&service->srv_reply_queue);
314
315         /* First initialise enough for early teardown */
316         for (i = 0; i < ptlrpc_ninterfaces; i++) {
317                 srv_ni = &service->srv_interfaces[i];
318
319                 srv_ni->sni_service = service;
320                 srv_ni->sni_ni = &ptlrpc_interfaces[i];
321                 INIT_LIST_HEAD(&srv_ni->sni_active_rqbds);
322                 INIT_LIST_HEAD(&srv_ni->sni_active_replies);
323         }
324
325         spin_lock (&ptlrpc_all_services_lock);
326         list_add (&service->srv_list, &ptlrpc_all_services);
327         spin_unlock (&ptlrpc_all_services_lock);
328         
329         /* Now allocate the request buffers, assuming all interfaces require
330          * the same number. */
331         for (i = 0; i < ptlrpc_ninterfaces; i++) {
332                 srv_ni = &service->srv_interfaces[i];
333                 CDEBUG (D_NET, "%s: initialising interface %s\n", name,
334                         srv_ni->sni_ni->pni_name);
335
336                 rc = ptlrpc_grow_req_bufs(srv_ni);
337                 /* We shouldn't be under memory pressure at startup, so
338                  * fail if we can't post all our buffers at this time. */
339                 if (rc != 0)
340                         GOTO(failed, NULL);
341         }
342
343         if (proc_entry != NULL)
344                 ptlrpc_lprocfs_register_service(proc_entry, service);
345
346         CDEBUG(D_NET, "%s: Started on %d interfaces, listening on portal %d\n",
347                service->srv_name, ptlrpc_ninterfaces, service->srv_req_portal);
348
349         RETURN(service);
350 failed:
351         ptlrpc_unregister_service(service);
352         return NULL;
353 }
354
355 static void
356 ptlrpc_server_free_request(struct ptlrpc_service *svc, struct ptlrpc_request *req)
357 {
358         unsigned long  flags;
359         int            refcount;
360         
361         spin_lock_irqsave(&svc->srv_lock, flags);
362         svc->srv_n_active_reqs--;
363         refcount = --(req->rq_rqbd->rqbd_refcount);
364         if (refcount == 0) {
365                 /* request buffer is now idle */
366                 list_del(&req->rq_rqbd->rqbd_list);
367                 list_add_tail(&req->rq_rqbd->rqbd_list,
368                               &svc->srv_idle_rqbds);
369         }
370         spin_unlock_irqrestore(&svc->srv_lock, flags);
371
372         ptlrpc_free_server_req(req);
373 }
374
375 static int 
376 ptlrpc_server_handle_request (struct ptlrpc_service *svc)
377 {
378         struct ptlrpc_request *request;
379         unsigned long          flags;
380         struct timeval         work_start;
381         struct timeval         work_end;
382         long                   timediff;
383         int                    rc;
384         ENTRY;
385
386         spin_lock_irqsave (&svc->srv_lock, flags);
387         if (list_empty (&svc->srv_request_queue) ||
388             (svc->srv_n_difficult_replies != 0 &&
389              svc->srv_n_active_reqs >= (svc->srv_nthreads - 1))) {
390                 /* If all the other threads are handling requests, I must
391                  * remain free to handle any 'difficult' reply that might
392                  * block them */
393                 spin_unlock_irqrestore (&svc->srv_lock, flags);
394                 RETURN(0);
395         }
396
397         request = list_entry (svc->srv_request_queue.next,
398                               struct ptlrpc_request, rq_list);
399         list_del_init (&request->rq_list);
400         svc->srv_n_queued_reqs--;
401         svc->srv_n_active_reqs++;
402
403         spin_unlock_irqrestore (&svc->srv_lock, flags);
404
405         do_gettimeofday(&work_start);
406         timediff = timeval_sub(&work_start, &request->rq_arrival_time);
407         if (svc->srv_stats != NULL) {
408                 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQWAIT_CNTR,
409                                     timediff);
410                 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQQDEPTH_CNTR,
411                                     svc->srv_n_queued_reqs);
412                 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQACTIVE_CNTR,
413                                     svc->srv_n_active_reqs);
414         }
415
416 #if SWAB_PARANOIA
417         /* Clear request swab mask; this is a new request */
418         request->rq_req_swab_mask = 0;
419 #endif
420         rc = lustre_unpack_msg (request->rq_reqmsg, request->rq_reqlen);
421         if (rc != 0) {
422                 CERROR ("error unpacking request: ptl %d from "LPX64
423                         " xid "LPU64"\n", svc->srv_req_portal,
424                        request->rq_peer.peer_nid, request->rq_xid);
425                 goto out;
426         }
427
428         rc = -EINVAL;
429         if (request->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
430                 CERROR("wrong packet type received (type=%u) from "
431                        LPX64"\n", request->rq_reqmsg->type,
432                        request->rq_peer.peer_nid);
433                 goto out;
434         }
435
436         CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid);
437
438         /* Discard requests queued for longer than my timeout.  If the
439          * client's timeout is similar to mine, she'll be timing out this
440          * REQ anyway (bug 1502) */
441         if (timediff / 1000000 > (long)obd_timeout) {
442                 CERROR("Dropping timed-out opc %d request from "LPX64
443                        ": %ld seconds old\n", request->rq_reqmsg->opc,
444                        request->rq_peer.peer_nid, timediff / 1000000);
445                 goto out;
446         }
447
448         request->rq_export = class_conn2export(&request->rq_reqmsg->handle);
449
450         if (request->rq_export) {
451                 if (request->rq_reqmsg->conn_cnt <
452                     request->rq_export->exp_conn_cnt) {
453                         DEBUG_REQ(D_ERROR, request,
454                                   "DROPPING req from old connection %d < %d",
455                                   request->rq_reqmsg->conn_cnt,
456                                   request->rq_export->exp_conn_cnt);
457                         goto put_conn;
458                 }
459
460                 request->rq_export->exp_last_request_time =
461                         LTIME_S(CURRENT_TIME);
462         }
463
464         CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:ni:nid:opc "
465                "%s:%s+%d:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
466                (request->rq_export ?
467                 (char *)request->rq_export->exp_client_uuid.uuid : "0"),
468                (request->rq_export ?
469                 atomic_read(&request->rq_export->exp_refcount) : -99),
470                request->rq_reqmsg->status, request->rq_xid,
471                request->rq_peer.peer_ni->pni_name,
472                request->rq_peer.peer_nid,
473                request->rq_reqmsg->opc);
474
475         rc = svc->srv_handler(request);
476         CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:ni:nid:opc "
477                "%s:%s+%d:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
478                (request->rq_export ?
479                 (char *)request->rq_export->exp_client_uuid.uuid : "0"),
480                (request->rq_export ?
481                 atomic_read(&request->rq_export->exp_refcount) : -99),
482                request->rq_reqmsg->status, request->rq_xid,
483                request->rq_peer.peer_ni->pni_name,
484                request->rq_peer.peer_nid,
485                request->rq_reqmsg->opc);
486
487 put_conn:
488         if (request->rq_export != NULL)
489                 class_export_put(request->rq_export);
490
491  out:
492         do_gettimeofday(&work_end);
493
494         timediff = timeval_sub(&work_end, &work_start);
495
496         CDEBUG((timediff / 1000000 > (long)obd_timeout) ? D_ERROR : D_HA,
497                "request "LPU64" opc %u from NID "LPX64" processed in %ldus "
498                "(%ldus total)\n", request->rq_xid, request->rq_reqmsg->opc,
499                request->rq_peer.peer_nid,
500                timediff, timeval_sub(&work_end, &request->rq_arrival_time));
501
502         if (svc->srv_stats != NULL) {
503                 int opc = opcode_offset(request->rq_reqmsg->opc);
504                 if (opc > 0) {
505                         LASSERT(opc < LUSTRE_MAX_OPCODES);
506                         lprocfs_counter_add(svc->srv_stats,
507                                             opc + PTLRPC_LAST_CNTR,
508                                             timediff);
509                 }
510         }
511
512         ptlrpc_server_free_request(svc, request);
513         
514         RETURN(1);
515 }
516
517 static int
518 ptlrpc_server_handle_reply (struct ptlrpc_service *svc) 
519 {
520         struct ptlrpc_reply_state *rs;
521         unsigned long              flags;
522         struct obd_export         *exp;
523         struct obd_device         *obd;
524         int                        nlocks;
525         int                        been_handled;
526         ENTRY;
527
528         spin_lock_irqsave (&svc->srv_lock, flags);
529         if (list_empty (&svc->srv_reply_queue)) {
530                 spin_unlock_irqrestore (&svc->srv_lock, flags);
531                 RETURN(0);
532         }
533         
534         rs = list_entry (svc->srv_reply_queue.next,
535                          struct ptlrpc_reply_state, rs_list);
536
537         exp = rs->rs_export;
538         obd = exp->exp_obd;
539
540         LASSERT (rs->rs_difficult);
541         LASSERT (rs->rs_scheduled);
542
543         list_del_init (&rs->rs_list);
544
545         /* Disengage from notifiers carefully (lock ordering!) */
546         spin_unlock(&svc->srv_lock);
547
548         spin_lock (&obd->obd_uncommitted_replies_lock);
549         /* Noop if removed already */
550         list_del_init (&rs->rs_obd_list);
551         spin_unlock (&obd->obd_uncommitted_replies_lock);
552
553         spin_lock (&exp->exp_lock);
554         /* Noop if removed already */
555         list_del_init (&rs->rs_exp_list);
556         spin_unlock (&exp->exp_lock);
557
558         spin_lock(&svc->srv_lock);
559
560         been_handled = rs->rs_handled;
561         rs->rs_handled = 1;
562         
563         nlocks = rs->rs_nlocks;                 /* atomic "steal", but */
564         rs->rs_nlocks = 0;                      /* locks still on rs_locks! */
565
566         if (nlocks == 0 && !been_handled) {
567                 /* If we see this, we should already have seen the warning
568                  * in mds_steal_ack_locks()  */
569                 CWARN("All locks stolen from rs %p x"LPD64".t"LPD64
570                       " o%d NID"LPX64"\n",
571                       rs, 
572                       rs->rs_xid, rs->rs_transno,
573                       rs->rs_msg.opc, exp->exp_connection->c_peer.peer_nid);
574         }
575
576         if ((!been_handled && rs->rs_on_net) || 
577             nlocks > 0) {
578                 spin_unlock_irqrestore(&svc->srv_lock, flags);
579                 
580                 if (!been_handled && rs->rs_on_net) {
581                         PtlMDUnlink(rs->rs_md_h);
582                         /* Ignore return code; we're racing with
583                          * completion... */
584                 }
585
586                 while (nlocks-- > 0)
587                         ldlm_lock_decref(&rs->rs_locks[nlocks], 
588                                          rs->rs_modes[nlocks]);
589
590                 spin_lock_irqsave(&svc->srv_lock, flags);
591         }
592
593         rs->rs_scheduled = 0;
594
595         if (!rs->rs_on_net) {
596                 /* Off the net */
597                 svc->srv_n_difficult_replies--;
598                 spin_unlock_irqrestore(&svc->srv_lock, flags);
599                 
600                 class_export_put (exp);
601                 rs->rs_export = NULL;
602                 lustre_free_reply_state (rs);
603                 atomic_dec (&svc->srv_outstanding_replies);
604                 RETURN(1);
605         }
606         
607         /* still on the net; callback will schedule */
608         spin_unlock_irqrestore (&svc->srv_lock, flags);
609         RETURN(1);
610 }
611
612 #ifndef __KERNEL__
613 /* FIXME make use of timeout later */
614 int
615 liblustre_check_services (void *arg) 
616 {
617         int  did_something = 0;
618         int  rc;
619         struct list_head *tmp, *nxt;
620         ENTRY;
621         
622         /* I'm relying on being single threaded, not to have to lock
623          * ptlrpc_all_services etc */
624         list_for_each_safe (tmp, nxt, &ptlrpc_all_services) {
625                 struct ptlrpc_service *svc =
626                         list_entry (tmp, struct ptlrpc_service, srv_list);
627                 
628                 if (svc->srv_nthreads != 0)     /* I've recursed */
629                         continue;
630
631                 /* service threads can block for bulk, so this limits us
632                  * (arbitrarily) to recursing 1 stack frame per service.
633                  * Note that the problem with recursion is that we have to
634                  * unwind completely before our caller can resume. */
635                 
636                 svc->srv_nthreads++;
637                 
638                 do {
639                         rc = ptlrpc_server_handle_reply(svc);
640                         rc |= ptlrpc_server_handle_request(svc);
641                         rc |= (ptlrpc_server_post_idle_rqbds(svc) > 0);
642                         did_something |= rc;
643                 } while (rc);
644                 
645                 svc->srv_nthreads--;
646         }
647
648         RETURN(did_something);
649 }
650
651 #else /* __KERNEL__ */
652
653 /* Don't use daemonize, it removes fs struct from new thread (bug 418) */
654 void ptlrpc_daemonize(void)
655 {
656         exit_mm(current);
657         lustre_daemonize_helper();
658         exit_files(current);
659         reparent_to_init();
660 }
661
662 static void
663 ptlrpc_check_rqbd_pools(struct ptlrpc_service *svc)
664 {
665         struct ptlrpc_srv_ni  *sni;
666         int                    i, avail = 0;
667         int                    low_water = svc->srv_nbuf_per_group/2;
668
669         for (i = 0; i < ptlrpc_ninterfaces; i++) {
670                 sni = &svc->srv_interfaces[i];
671
672                 avail += sni->sni_nrqbd_receiving;
673                 /* NB I'm not locking; just looking. */
674                 if (sni->sni_nrqbd_receiving <= low_water)
675                         ptlrpc_grow_req_bufs(sni);
676         }
677         lprocfs_counter_add(svc->srv_stats, PTLRPC_REQBUF_AVAIL_CNTR, avail);
678         lprocfs_counter_add(svc->srv_stats, PTLRPC_REQBUF_TOTAL_CNTR,
679                             svc->srv_nbufs);
680 }
681
682 static int
683 ptlrpc_retry_rqbds(void *arg)
684 {
685         struct ptlrpc_service *svc = (struct ptlrpc_service *)arg;
686         
687         svc->srv_rqbd_timeout = 0;
688         return (-ETIMEDOUT);
689 }
690
691 static int ptlrpc_main(void *arg)
692 {
693         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
694         struct ptlrpc_service  *svc = data->svc;
695         struct ptlrpc_thread   *thread = data->thread;
696         unsigned long           flags;
697         ENTRY;
698
699         lock_kernel();
700         ptlrpc_daemonize();
701
702         SIGNAL_MASK_LOCK(current, flags);
703         sigfillset(&current->blocked);
704         RECALC_SIGPENDING;
705         SIGNAL_MASK_UNLOCK(current, flags);
706
707         LASSERTF(strlen(data->name) < sizeof(current->comm),
708                  "name %d > len %d\n",
709                  (int)strlen(data->name), (int)sizeof(current->comm));
710         THREAD_NAME(current->comm, sizeof(current->comm) - 1, "%s", data->name);
711         unlock_kernel();
712
713         /* Record that the thread is running */
714         thread->t_flags = SVC_RUNNING;
715         wake_up(&thread->t_ctl_waitq);
716
717         spin_lock_irqsave(&svc->srv_lock, flags);
718         svc->srv_nthreads++;
719         spin_unlock_irqrestore(&svc->srv_lock, flags);
720
721         /* XXX maintain a list of all managed devices: insert here */
722
723         while ((thread->t_flags & SVC_STOPPING) == 0 ||
724                svc->srv_n_difficult_replies != 0) {
725                 /* Don't exit while there are replies to be handled */
726                 struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
727                                                      ptlrpc_retry_rqbds, svc);
728
729                 l_wait_event_exclusive (svc->srv_waitq,
730                               ((thread->t_flags & SVC_STOPPING) != 0 &&
731                                svc->srv_n_difficult_replies == 0) ||
732                               (!list_empty(&svc->srv_idle_rqbds) &&
733                                svc->srv_rqbd_timeout == 0) ||
734                               !list_empty (&svc->srv_reply_queue) ||
735                               (!list_empty (&svc->srv_request_queue) &&
736                                (svc->srv_n_difficult_replies == 0 ||
737                                 svc->srv_n_active_reqs <
738                                 (svc->srv_nthreads - 1))),
739                               &lwi);
740
741                 ptlrpc_check_rqbd_pools(svc);
742
743                 if (!list_empty (&svc->srv_reply_queue))
744                         ptlrpc_server_handle_reply (svc);
745
746                 /* only handle requests if there are no difficult replies
747                  * outstanding, or I'm not the last thread handling
748                  * requests */
749                 if (!list_empty (&svc->srv_request_queue) &&
750                     (svc->srv_n_difficult_replies == 0 ||
751                      svc->srv_n_active_reqs < (svc->srv_nthreads - 1)))
752                         ptlrpc_server_handle_request (svc);
753
754                 if (!list_empty(&svc->srv_idle_rqbds) &&
755                     ptlrpc_server_post_idle_rqbds(svc) < 0) {
756                         /* I just failed to repost request buffers.  Wait
757                          * for a timeout (unless something else happens)
758                          * before I try again */
759                         svc->srv_rqbd_timeout = HZ/10;
760                 }
761         }
762
763         spin_lock_irqsave(&svc->srv_lock, flags);
764
765         svc->srv_nthreads--;                    /* must know immediately */
766         thread->t_flags = SVC_STOPPED;
767         wake_up(&thread->t_ctl_waitq);
768
769         spin_unlock_irqrestore(&svc->srv_lock, flags);
770
771         CDEBUG(D_NET, "service thread exiting, process %d\n", current->pid);
772         return 0;
773 }
774
775 static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
776                                struct ptlrpc_thread *thread)
777 {
778         struct l_wait_info lwi = { 0 };
779         unsigned long      flags;
780
781         spin_lock_irqsave(&svc->srv_lock, flags);
782         thread->t_flags = SVC_STOPPING;
783         spin_unlock_irqrestore(&svc->srv_lock, flags);
784
785         wake_up_all(&svc->srv_waitq);
786         l_wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED),
787                      &lwi);
788
789         spin_lock_irqsave(&svc->srv_lock, flags);
790         list_del(&thread->t_link);
791         spin_unlock_irqrestore(&svc->srv_lock, flags);
792
793         OBD_FREE(thread, sizeof(*thread));
794 }
795
796 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
797 {
798         unsigned long flags;
799         struct ptlrpc_thread *thread;
800
801         spin_lock_irqsave(&svc->srv_lock, flags);
802         while (!list_empty(&svc->srv_threads)) {
803                 thread = list_entry(svc->srv_threads.next,
804                                     struct ptlrpc_thread, t_link);
805
806                 spin_unlock_irqrestore(&svc->srv_lock, flags);
807                 ptlrpc_stop_thread(svc, thread);
808                 spin_lock_irqsave(&svc->srv_lock, flags);
809         }
810
811         spin_unlock_irqrestore(&svc->srv_lock, flags);
812 }
813
814 /* @base_name should be 12 characters or less - 3 will be added on */
815 int ptlrpc_start_n_threads(struct obd_device *dev, struct ptlrpc_service *svc,
816                            int num_threads, char *base_name)
817 {
818         int i, rc = 0;
819         ENTRY;
820
821         for (i = 0; i < num_threads; i++) {
822                 char name[32];
823                 sprintf(name, "%s_%02d", base_name, i);
824                 rc = ptlrpc_start_thread(dev, svc, name);
825                 if (rc) {
826                         CERROR("cannot start %s thread #%d: rc %d\n", base_name,
827                                i, rc);
828                         ptlrpc_stop_all_threads(svc);
829                 }
830         }
831         RETURN(rc);
832 }
833
834 int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
835                         char *name)
836 {
837         struct l_wait_info lwi = { 0 };
838         struct ptlrpc_svc_data d;
839         struct ptlrpc_thread *thread;
840         unsigned long flags;
841         int rc;
842         ENTRY;
843
844         OBD_ALLOC(thread, sizeof(*thread));
845         if (thread == NULL)
846                 RETURN(-ENOMEM);
847         init_waitqueue_head(&thread->t_ctl_waitq);
848         
849         d.dev = dev;
850         d.svc = svc;
851         d.name = name;
852         d.thread = thread;
853
854         spin_lock_irqsave(&svc->srv_lock, flags);
855         list_add(&thread->t_link, &svc->srv_threads);
856         spin_unlock_irqrestore(&svc->srv_lock, flags);
857
858         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
859          * just drop the VM and FILES in ptlrpc_daemonize() right away.
860          */
861         rc = kernel_thread(ptlrpc_main, &d, CLONE_VM | CLONE_FILES);
862         if (rc < 0) {
863                 CERROR("cannot start thread: %d\n", rc);
864                 OBD_FREE(thread, sizeof(*thread));
865                 RETURN(rc);
866         }
867         l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi);
868
869         RETURN(0);
870 }
871 #endif
872
873 int ptlrpc_unregister_service(struct ptlrpc_service *service)
874 {
875         int                   i;
876         int                   rc;
877         unsigned long         flags;
878         struct ptlrpc_srv_ni *srv_ni;
879         struct l_wait_info    lwi;
880         struct list_head     *tmp;
881
882         LASSERT(list_empty(&service->srv_threads));
883
884         spin_lock (&ptlrpc_all_services_lock);
885         list_del_init (&service->srv_list);
886         spin_unlock (&ptlrpc_all_services_lock);
887
888         ptlrpc_lprocfs_unregister_service(service);
889
890         for (i = 0; i < ptlrpc_ninterfaces; i++) {
891                 srv_ni = &service->srv_interfaces[i];
892                 CDEBUG(D_NET, "%s: tearing down interface %s\n",
893                        service->srv_name, srv_ni->sni_ni->pni_name);
894
895                 /* Unlink all the request buffers.  This forces a 'final'
896                  * event with its 'unlink' flag set for each posted rqbd */
897                 list_for_each(tmp, &srv_ni->sni_active_rqbds) {
898                         struct ptlrpc_request_buffer_desc *rqbd =
899                                 list_entry(tmp, struct ptlrpc_request_buffer_desc, 
900                                            rqbd_list);
901
902                         rc = PtlMDUnlink(rqbd->rqbd_md_h);
903                         LASSERT (rc == PTL_OK || rc == PTL_INV_MD);
904                 }
905
906                 /* Wait for the network to release any buffers it's
907                  * currently filling */
908                 for (;;) {
909                         spin_lock_irqsave(&service->srv_lock, flags);
910                         rc = srv_ni->sni_nrqbd_receiving;
911                         spin_unlock_irqrestore(&service->srv_lock, flags);
912
913                         if (rc == 0)
914                                 break;
915                         
916                         /* Network access will complete in finite time but
917                          * the HUGE timeout lets us CWARN for visibility of
918                          * sluggish NALs */
919                         lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
920                         rc = l_wait_event(service->srv_waitq,
921                                           srv_ni->sni_nrqbd_receiving == 0,
922                                           &lwi);
923                         if (rc == -ETIMEDOUT)
924                                 CWARN("Waiting for request buffers on "
925                                       "service %s on interface %s ",
926                                       service->srv_name, srv_ni->sni_ni->pni_name);
927                 }
928
929                 /* schedule all outstanding replies to terminate them */
930                 spin_lock_irqsave(&service->srv_lock, flags);
931                 while (!list_empty(&srv_ni->sni_active_replies)) {
932                         struct ptlrpc_reply_state *rs =
933                                 list_entry(srv_ni->sni_active_replies.next,
934                                            struct ptlrpc_reply_state,
935                                            rs_list);
936                         ptlrpc_schedule_difficult_reply(rs);
937                 }
938                 spin_unlock_irqrestore(&service->srv_lock, flags);
939         }
940
941         /* purge the request queue.  NB No new replies (rqbds all unlinked)
942          * and no service threads, so I'm the only thread noodling the
943          * request queue now */
944         while (!list_empty(&service->srv_request_queue)) {
945                 struct ptlrpc_request *req =
946                         list_entry(service->srv_request_queue.next,
947                                    struct ptlrpc_request,
948                                    rq_list);
949                 
950                 list_del(&req->rq_list);
951                 service->srv_n_queued_reqs--;
952                 service->srv_n_active_reqs++;
953
954                 ptlrpc_server_free_request(service, req);
955         }
956         LASSERT(service->srv_n_queued_reqs == 0);
957         LASSERT(service->srv_n_active_reqs == 0);
958
959         for (i = 0; i < ptlrpc_ninterfaces; i++) {
960                 srv_ni = &service->srv_interfaces[i];
961                 LASSERT(list_empty(&srv_ni->sni_active_rqbds));
962         }
963
964         /* Now free all the request buffers since nothing references them
965          * any more... */
966         while (!list_empty(&service->srv_idle_rqbds)) {
967                 struct ptlrpc_request_buffer_desc *rqbd =
968                         list_entry(service->srv_idle_rqbds.next,
969                                    struct ptlrpc_request_buffer_desc, 
970                                    rqbd_list);
971
972                 ptlrpc_free_rqbd(rqbd);
973         }
974
975         /* wait for all outstanding replies to complete (they were
976          * scheduled having been flagged to abort above) */
977         while (atomic_read(&service->srv_outstanding_replies) != 0) {
978                 struct l_wait_info lwi = LWI_TIMEOUT(10 * HZ, NULL, NULL);
979
980                 rc = l_wait_event(service->srv_waitq,
981                                   !list_empty(&service->srv_reply_queue), &lwi);
982                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
983
984                 if (rc == 0) {
985                         ptlrpc_server_handle_reply(service);
986                         continue;
987                 }
988                 CWARN("Unexpectedly long timeout %p\n", service);
989         }
990
991         OBD_FREE(service,
992                  offsetof(struct ptlrpc_service,
993                           srv_interfaces[ptlrpc_ninterfaces]));
994         return 0;
995 }