Whamcloud - gitweb
b=4049
[fs/lustre-release.git] / lustre / ptlrpc / service.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <liblustre.h>
26 #include <linux/kp30.h>
27 #endif
28 #include <linux/obd_support.h>
29 #include <linux/obd_class.h>
30 #include <linux/lustre_net.h>
31 #include <linux/lustre_log.h>
32 #include <portals/types.h>
33 #include "ptlrpc_internal.h"
34
35 static LIST_HEAD (ptlrpc_all_services);
36 static spinlock_t ptlrpc_all_services_lock = SPIN_LOCK_UNLOCKED;
37
38 static void
39 ptlrpc_free_server_req (struct ptlrpc_request *req)
40 {
41         /* The last request to be received into a request buffer uses space
42          * in the request buffer descriptor, otherwise requests are
43          * allocated dynamically in the incoming reply event handler */
44         if (req == &req->rq_rqbd->rqbd_req)
45                 return;
46
47         OBD_FREE(req, sizeof(*req));
48 }
49         
50 static char *
51 ptlrpc_alloc_request_buffer (int size)
52 {
53         char *ptr;
54         
55         if (size > SVC_BUF_VMALLOC_THRESHOLD)
56                 OBD_VMALLOC(ptr, size);
57         else
58                 OBD_ALLOC(ptr, size);
59         
60         return (ptr);
61 }
62
63 static void
64 ptlrpc_free_request_buffer (char *ptr, int size)
65 {
66         if (size > SVC_BUF_VMALLOC_THRESHOLD)
67                 OBD_VFREE(ptr, size);
68         else
69                 OBD_FREE(ptr, size);
70 }
71
72 struct ptlrpc_request_buffer_desc *
73 ptlrpc_alloc_rqbd (struct ptlrpc_srv_ni *srv_ni)
74 {
75         struct ptlrpc_service             *svc = srv_ni->sni_service;
76         unsigned long                      flags;
77         struct ptlrpc_request_buffer_desc *rqbd;
78
79         OBD_ALLOC(rqbd, sizeof (*rqbd));
80         if (rqbd == NULL)
81                 return (NULL);
82
83         rqbd->rqbd_srv_ni = srv_ni;
84         rqbd->rqbd_refcount = 0;
85         rqbd->rqbd_cbid.cbid_fn = request_in_callback;
86         rqbd->rqbd_cbid.cbid_arg = rqbd;
87         rqbd->rqbd_buffer = ptlrpc_alloc_request_buffer(svc->srv_buf_size);
88
89         if (rqbd->rqbd_buffer == NULL) {
90                 OBD_FREE(rqbd, sizeof (*rqbd));
91                 return (NULL);
92         }
93
94         spin_lock_irqsave (&svc->srv_lock, flags);
95         list_add(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
96         svc->srv_nbufs++;
97         spin_unlock_irqrestore (&svc->srv_lock, flags);
98
99         return (rqbd);
100 }
101
102 void
103 ptlrpc_free_rqbd (struct ptlrpc_request_buffer_desc *rqbd) 
104 {
105         struct ptlrpc_srv_ni  *sni = rqbd->rqbd_srv_ni;
106         struct ptlrpc_service *svc = sni->sni_service;
107         unsigned long          flags;
108         
109         LASSERT (rqbd->rqbd_refcount == 0);
110
111         spin_lock_irqsave(&svc->srv_lock, flags);
112         list_del(&rqbd->rqbd_list);
113         svc->srv_nbufs--;
114         spin_unlock_irqrestore(&svc->srv_lock, flags);
115
116         ptlrpc_free_request_buffer (rqbd->rqbd_buffer, svc->srv_buf_size);
117         OBD_FREE (rqbd, sizeof (*rqbd));
118 }
119
120 void
121 ptlrpc_save_llog_lock (struct ptlrpc_request *req,
122                        struct llog_create_locks *lcl)
123 {
124         struct ptlrpc_reply_state *rs = req->rq_reply_state;
125         LASSERT (rs != NULL);
126         LASSERT (rs->rs_llog_locks == NULL);
127
128         rs->rs_llog_locks = lcl;
129 }
130
131 void
132 ptlrpc_require_repack(struct ptlrpc_request *req)
133 {
134         struct ptlrpc_reply_state *rs = req->rq_reply_state;
135         LASSERT (rs != NULL);
136         rs->rs_difficult = 1;
137 }
138
139 void
140 ptlrpc_save_lock (struct ptlrpc_request *req, 
141                   struct lustre_handle *lock, int mode)
142 {
143         struct ptlrpc_reply_state *rs = req->rq_reply_state;
144         int                        idx;
145
146         if (!lock->cookie)
147                 return;
148
149         LASSERT (rs != NULL);
150         LASSERT (rs->rs_nlocks < RS_MAX_LOCKS);
151
152         idx = rs->rs_nlocks++;
153         rs->rs_locks[idx] = *lock;
154         rs->rs_modes[idx] = mode;
155         rs->rs_difficult = 1;
156 }
157
158 void
159 ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
160 {
161         struct ptlrpc_service *svc = rs->rs_srv_ni->sni_service;
162
163 #ifdef CONFIG_SMP
164         LASSERT (spin_is_locked (&svc->srv_lock));
165 #endif
166         LASSERT (rs->rs_difficult);
167         rs->rs_scheduled_ever = 1;              /* flag any notification attempt */
168
169         if (rs->rs_scheduled)                   /* being set up or already notified */
170                 return;
171
172         rs->rs_scheduled = 1;
173         list_del (&rs->rs_list);
174         list_add (&rs->rs_list, &svc->srv_reply_queue);
175         wake_up (&svc->srv_waitq);
176 }
177
178 void 
179 ptlrpc_commit_replies (struct obd_device *obd)
180 {
181         struct list_head   *tmp;
182         struct list_head   *nxt;
183         unsigned long       flags;
184         
185         /* Find any replies that have been committed and get their service
186          * to attend to complete them. */
187
188         /* CAVEAT EMPTOR: spinlock ordering!!! */
189         spin_lock_irqsave (&obd->obd_uncommitted_replies_lock, flags);
190
191         list_for_each_safe (tmp, nxt, &obd->obd_uncommitted_replies) {
192                 struct ptlrpc_reply_state *rs =
193                         list_entry (tmp, struct ptlrpc_reply_state, rs_obd_list);
194                 struct llog_create_locks *lcl = rs->rs_llog_locks;
195
196                 rs->rs_llog_locks = NULL; 
197                 LASSERT (rs->rs_difficult);
198
199                 if (rs->rs_transno <= obd->obd_last_committed) {
200                         struct ptlrpc_service *svc = rs->rs_srv_ni->sni_service;
201
202                         spin_lock (&svc->srv_lock);
203                         list_del_init (&rs->rs_obd_list);
204                         ptlrpc_schedule_difficult_reply (rs);
205                         spin_unlock (&svc->srv_lock);
206
207                         if (lcl != NULL)
208                                 llog_create_lock_free(lcl);
209                 }
210         }
211         
212         spin_unlock_irqrestore (&obd->obd_uncommitted_replies_lock, flags);
213 }
214
215 static long
216 timeval_sub(struct timeval *large, struct timeval *small)
217 {
218         return (large->tv_sec - small->tv_sec) * 1000000 +
219                 (large->tv_usec - small->tv_usec);
220 }
221
222 static int
223 ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc)
224 {
225         struct ptlrpc_srv_ni              *srv_ni;
226         struct ptlrpc_request_buffer_desc *rqbd;
227         unsigned long                      flags;
228         int                                rc;
229
230         spin_lock_irqsave(&svc->srv_lock, flags);
231         if (list_empty (&svc->srv_idle_rqbds)) {
232                 spin_unlock_irqrestore(&svc->srv_lock, flags);
233                 return (0);
234         }
235
236         rqbd = list_entry(svc->srv_idle_rqbds.next,
237                           struct ptlrpc_request_buffer_desc,
238                           rqbd_list);
239         list_del (&rqbd->rqbd_list);
240
241         /* assume we will post successfully */
242         srv_ni = rqbd->rqbd_srv_ni;
243         srv_ni->sni_nrqbd_receiving++;
244         list_add (&rqbd->rqbd_list, &srv_ni->sni_active_rqbds);
245
246         spin_unlock_irqrestore(&svc->srv_lock, flags);
247
248         rc = ptlrpc_register_rqbd(rqbd);
249         if (rc == 0)
250                 return (1);
251
252         spin_lock_irqsave(&svc->srv_lock, flags);
253
254         srv_ni->sni_nrqbd_receiving--;
255         list_del(&rqbd->rqbd_list);
256         list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
257
258         if (srv_ni->sni_nrqbd_receiving == 0) {
259                 /* This service is off-air on this interface because all
260                  * its request buffers are busy.  Portals will have started
261                  * dropping incoming requests until more buffers get
262                  * posted */
263                 CERROR("All %s %s request buffers busy\n",
264                        svc->srv_name, srv_ni->sni_ni->pni_name);
265         }
266
267         spin_unlock_irqrestore (&svc->srv_lock, flags);
268
269         return (-1);
270 }
271
272 struct ptlrpc_service *
273 ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
274                 int req_portal, int rep_portal, 
275                 svc_handler_t handler, char *name,
276                 struct proc_dir_entry *proc_entry)
277 {
278         int                                i;
279         int                                j;
280         int                                ssize;
281         struct ptlrpc_service             *service;
282         struct ptlrpc_srv_ni              *srv_ni;
283         struct ptlrpc_request_buffer_desc *rqbd;
284         ENTRY;
285
286         LASSERT (ptlrpc_ninterfaces > 0);
287         LASSERT (nbufs > 0);
288         LASSERT (bufsize >= max_req_size);
289         
290         ssize = offsetof (struct ptlrpc_service,
291                           srv_interfaces[ptlrpc_ninterfaces]);
292         OBD_ALLOC(service, ssize);
293         if (service == NULL)
294                 RETURN(NULL);
295
296         service->srv_name = name;
297         spin_lock_init(&service->srv_lock);
298         INIT_LIST_HEAD(&service->srv_threads);
299         init_waitqueue_head(&service->srv_waitq);
300
301         service->srv_max_req_size = max_req_size;
302         service->srv_buf_size = bufsize;
303         service->srv_rep_portal = rep_portal;
304         service->srv_req_portal = req_portal;
305         service->srv_handler = handler;
306
307         INIT_LIST_HEAD(&service->srv_request_queue);
308         INIT_LIST_HEAD(&service->srv_idle_rqbds);
309         INIT_LIST_HEAD(&service->srv_reply_queue);
310
311         /* First initialise enough for early teardown */
312         for (i = 0; i < ptlrpc_ninterfaces; i++) {
313                 srv_ni = &service->srv_interfaces[i];
314
315                 srv_ni->sni_service = service;
316                 srv_ni->sni_ni = &ptlrpc_interfaces[i];
317                 INIT_LIST_HEAD(&srv_ni->sni_active_rqbds);
318                 INIT_LIST_HEAD(&srv_ni->sni_active_replies);
319         }
320
321         spin_lock (&ptlrpc_all_services_lock);
322         list_add (&service->srv_list, &ptlrpc_all_services);
323         spin_unlock (&ptlrpc_all_services_lock);
324         
325         /* Now allocate the request buffers, assuming all interfaces require
326          * the same number. */
327         for (i = 0; i < ptlrpc_ninterfaces; i++) {
328                 srv_ni = &service->srv_interfaces[i];
329                 CDEBUG (D_NET, "%s: initialising interface %s\n", name,
330                         srv_ni->sni_ni->pni_name);
331
332                 for (j = 0; j < nbufs; j++) {
333                         rqbd = ptlrpc_alloc_rqbd (srv_ni);
334                         
335                         if (rqbd == NULL) {
336                                 CERROR ("%s.%d: Can't allocate request %d "
337                                         "on %s\n", name, i, j, 
338                                         srv_ni->sni_ni->pni_name);
339                                 GOTO(failed, NULL);
340                         }
341
342                         /* We shouldn't be under memory pressure at
343                          * startup, so fail if we can't post all our
344                          * buffers at this time. */
345                         if (ptlrpc_server_post_idle_rqbds(service) <= 0)
346                                 GOTO(failed, NULL);
347                 }
348         }
349
350         if (proc_entry != NULL)
351                 ptlrpc_lprocfs_register_service(proc_entry, service);
352
353         CDEBUG(D_NET, "%s: Started on %d interfaces, listening on portal %d\n",
354                service->srv_name, ptlrpc_ninterfaces, service->srv_req_portal);
355
356         RETURN(service);
357 failed:
358         ptlrpc_unregister_service(service);
359         return NULL;
360 }
361
362 static void
363 ptlrpc_server_free_request(struct ptlrpc_service *svc, struct ptlrpc_request *req)
364 {
365         unsigned long  flags;
366         int            refcount;
367         
368         spin_lock_irqsave(&svc->srv_lock, flags);
369         svc->srv_n_active_reqs--;
370         refcount = --(req->rq_rqbd->rqbd_refcount);
371         if (refcount == 0) {
372                 /* request buffer is now idle */
373                 list_del(&req->rq_rqbd->rqbd_list);
374                 list_add_tail(&req->rq_rqbd->rqbd_list,
375                               &svc->srv_idle_rqbds);
376         }
377         spin_unlock_irqrestore(&svc->srv_lock, flags);
378
379         ptlrpc_free_server_req(req);
380 }
381
382 static int 
383 ptlrpc_server_handle_request (struct ptlrpc_service *svc)
384 {
385         struct obd_export     *export = NULL;
386         struct ptlrpc_request *request;
387         unsigned long          flags;
388         struct timeval         work_start;
389         struct timeval         work_end;
390         long                   timediff;
391         int                    rc;
392         char                   str[PTL_NALFMT_SIZE];
393         ENTRY;
394
395         spin_lock_irqsave (&svc->srv_lock, flags);
396         if (list_empty (&svc->srv_request_queue) ||
397             (svc->srv_n_difficult_replies != 0 &&
398              svc->srv_n_active_reqs >= (svc->srv_nthreads - 1))) {
399                 /* If all the other threads are handling requests, I must
400                  * remain free to handle any 'difficult' reply that might
401                  * block them */
402                 spin_unlock_irqrestore (&svc->srv_lock, flags);
403                 RETURN(0);
404         }
405
406         request = list_entry (svc->srv_request_queue.next,
407                               struct ptlrpc_request, rq_list);
408         list_del_init (&request->rq_list);
409         svc->srv_n_queued_reqs--;
410         svc->srv_n_active_reqs++;
411
412         spin_unlock_irqrestore (&svc->srv_lock, flags);
413
414         do_gettimeofday(&work_start);
415         timediff = timeval_sub(&work_start, &request->rq_arrival_time);
416         if (svc->srv_stats != NULL) {
417                 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQWAIT_CNTR,
418                                     timediff);
419                 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQQDEPTH_CNTR,
420                                     svc->srv_n_queued_reqs);
421                 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQACTIVE_CNTR,
422                                     svc->srv_n_active_reqs);
423         }
424
425 #if SWAB_PARANOIA
426         /* Clear request swab mask; this is a new request */
427         request->rq_req_swab_mask = 0;
428 #endif
429         rc = lustre_unpack_msg (request->rq_reqmsg, request->rq_reqlen);
430         if (rc != 0) {
431                 CERROR ("error unpacking request: ptl %d from %s"
432                         " xid "LPU64"\n", svc->srv_req_portal,
433                         ptlrpc_peernid2str(&request->rq_peer, str),
434                        request->rq_xid);
435                 goto out;
436         }
437
438         rc = -EINVAL;
439         if (request->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
440                 CERROR("wrong packet type received (type=%u) from %s\n",
441                        request->rq_reqmsg->type,
442                        ptlrpc_peernid2str(&request->rq_peer, str));
443                 goto out;
444         }
445
446         CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid);
447
448         /* Discard requests queued for longer than my timeout.  If the
449          * client's timeout is similar to mine, she'll be timing out this
450          * REQ anyway (bug 1502) */
451         if (timediff / 1000000 > (long)obd_timeout) {
452                 CERROR("Dropping timed-out request from %s: %ld seconds old\n",
453                        ptlrpc_peernid2str(&request->rq_peer, str), 
454                        timediff / 1000000);
455                 goto out;
456         }
457
458         request->rq_export = class_conn2export(&request->rq_reqmsg->handle);
459
460         if (request->rq_export) {
461                 if (request->rq_reqmsg->conn_cnt <
462                     request->rq_export->exp_conn_cnt) {
463                         DEBUG_REQ(D_ERROR, request,
464                                   "DROPPING req from old connection %d < %d",
465                                   request->rq_reqmsg->conn_cnt,
466                                   request->rq_export->exp_conn_cnt);
467                         goto put_conn;
468                 }
469                 
470                 export = class_export_rpc_get(request->rq_export);
471                 request->rq_export->exp_last_request_time =
472                         LTIME_S(CURRENT_TIME);
473         }
474
475         CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:ni:nid:opc "
476                "%s:%s+%d:%d:"LPU64":%s:%s:%d\n", current->comm,
477                (request->rq_export ?
478                 (char *)request->rq_export->exp_client_uuid.uuid : "0"),
479                (request->rq_export ?
480                 atomic_read(&request->rq_export->exp_refcount) : -99),
481                request->rq_reqmsg->status, request->rq_xid,
482                request->rq_peer.peer_ni->pni_name,
483                ptlrpc_peernid2str(&request->rq_peer, str),
484                request->rq_reqmsg->opc);
485
486         rc = svc->srv_handler(request);
487         CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:ni:nid:opc "
488                "%s:%s+%d:%d:"LPU64":%s:%s:%d\n", current->comm,
489                (request->rq_export ?
490                 (char *)request->rq_export->exp_client_uuid.uuid : "0"),
491                (request->rq_export ?
492                 atomic_read(&request->rq_export->exp_refcount) : -99),
493                request->rq_reqmsg->status, request->rq_xid,
494                request->rq_peer.peer_ni->pni_name,
495                ptlrpc_peernid2str(&request->rq_peer, str),
496                request->rq_reqmsg->opc);
497
498         if (export != NULL)
499                 class_export_rpc_put(export);
500
501 put_conn:
502         if (request->rq_export != NULL)
503                 class_export_put(request->rq_export);
504
505  out:
506         do_gettimeofday(&work_end);
507
508         timediff = timeval_sub(&work_end, &work_start);
509
510         CDEBUG((timediff / 1000000 > (long)obd_timeout) ? D_ERROR : D_HA,
511                "request "LPU64" opc %u from NID %s processed in %ldus "
512                "(%ldus total)\n", request->rq_xid, request->rq_reqmsg->opc,
513                ptlrpc_peernid2str(&request->rq_peer, str),
514                timediff, timeval_sub(&work_end, &request->rq_arrival_time));
515
516         if (svc->srv_stats != NULL) {
517                 int opc = opcode_offset(request->rq_reqmsg->opc);
518                 if (opc > 0) {
519                         LASSERT(opc < LUSTRE_MAX_OPCODES);
520                         lprocfs_counter_add(svc->srv_stats,
521                                             opc + PTLRPC_LAST_CNTR,
522                                             timediff);
523                 }
524         }
525
526         ptlrpc_server_free_request(svc, request);
527         
528         RETURN(1);
529 }
530
531 static int
532 ptlrpc_server_handle_reply (struct ptlrpc_service *svc) 
533 {
534         struct ptlrpc_reply_state *rs;
535         unsigned long              flags;
536         struct obd_export         *exp;
537         struct obd_device         *obd;
538         struct llog_create_locks  *lcl;
539         int                        nlocks;
540         int                        been_handled;
541         char                       str[PTL_NALFMT_SIZE];
542         ENTRY;
543
544         spin_lock_irqsave (&svc->srv_lock, flags);
545         if (list_empty (&svc->srv_reply_queue)) {
546                 spin_unlock_irqrestore (&svc->srv_lock, flags);
547                 RETURN(0);
548         }
549         
550         rs = list_entry (svc->srv_reply_queue.next,
551                          struct ptlrpc_reply_state, rs_list);
552
553         exp = rs->rs_export;
554         obd = exp->exp_obd;
555
556         LASSERT (rs->rs_difficult);
557         LASSERT (rs->rs_scheduled);
558
559         list_del_init (&rs->rs_list);
560
561         /* Disengage from notifiers carefully (lock ordering!) */
562         spin_unlock(&svc->srv_lock);
563
564         spin_lock (&obd->obd_uncommitted_replies_lock);
565         /* Noop if removed already */
566         list_del_init (&rs->rs_obd_list);
567         spin_unlock (&obd->obd_uncommitted_replies_lock);
568
569         spin_lock (&exp->exp_lock);
570         /* Noop if removed already */
571         list_del_init (&rs->rs_exp_list);
572         spin_unlock (&exp->exp_lock);
573
574         spin_lock(&svc->srv_lock);
575
576         been_handled = rs->rs_handled;
577         rs->rs_handled = 1;
578         
579         nlocks = rs->rs_nlocks;                 /* atomic "steal", but */
580         rs->rs_nlocks = 0;                      /* locks still on rs_locks! */
581
582         lcl = rs->rs_llog_locks;
583         rs->rs_llog_locks = NULL;
584
585         if (nlocks == 0 && !been_handled) {
586                 /* If we see this, we should already have seen the warning
587                  * in mds_steal_ack_locks()  */
588 #if 0   
589                 /* CMD may ask to save request with no DLM locks -bzzz */
590                 CWARN("All locks stolen from rs %p x"LPD64".t"LPD64
591                       " o%d NID %s\n",
592                       rs, 
593                       rs->rs_xid, rs->rs_transno,
594                       rs->rs_msg.opc, 
595                       ptlrpc_peernid2str(&exp->exp_connection->c_peer, str));
596 #endif
597         }
598
599         if ((!been_handled && rs->rs_on_net) || 
600             nlocks > 0 || lcl != NULL) {
601                 spin_unlock_irqrestore(&svc->srv_lock, flags);
602                 
603                 if (!been_handled && rs->rs_on_net) {
604                         PtlMDUnlink(rs->rs_md_h);
605                         /* Ignore return code; we're racing with
606                          * completion... */
607                 }
608
609                 while (nlocks-- > 0)
610                         ldlm_lock_decref(&rs->rs_locks[nlocks], 
611                                          rs->rs_modes[nlocks]);
612
613                 if (lcl != NULL)
614                         llog_create_lock_free(lcl);         
615
616                 spin_lock_irqsave(&svc->srv_lock, flags);
617         }
618
619         rs->rs_scheduled = 0;
620
621         if (!rs->rs_on_net) {
622                 /* Off the net */
623                 svc->srv_n_difficult_replies--;
624                 spin_unlock_irqrestore(&svc->srv_lock, flags);
625                 
626                 class_export_put (exp);
627                 rs->rs_export = NULL;
628                 lustre_free_reply_state (rs);
629                 atomic_dec (&svc->srv_outstanding_replies);
630                 RETURN(1);
631         }
632         
633         /* still on the net; callback will schedule */
634         spin_unlock_irqrestore (&svc->srv_lock, flags);
635         RETURN(1);
636 }
637
638 #ifndef __KERNEL__
639 /* FIXME make use of timeout later */
640 int
641 liblustre_check_services (void *arg) 
642 {
643         int  did_something = 0;
644         int  rc;
645         struct list_head *tmp, *nxt;
646         ENTRY;
647         
648         /* I'm relying on being single threaded, not to have to lock
649          * ptlrpc_all_services etc */
650         list_for_each_safe (tmp, nxt, &ptlrpc_all_services) {
651                 struct ptlrpc_service *svc =
652                         list_entry (tmp, struct ptlrpc_service, srv_list);
653                 
654                 if (svc->srv_nthreads != 0)     /* I've recursed */
655                         continue;
656
657                 /* service threads can block for bulk, so this limits us
658                  * (arbitrarily) to recursing 1 stack frame per service.
659                  * Note that the problem with recursion is that we have to
660                  * unwind completely before our caller can resume. */
661                 
662                 svc->srv_nthreads++;
663                 
664                 do {
665                         rc = ptlrpc_server_handle_reply(svc);
666                         rc |= ptlrpc_server_handle_request(svc);
667                         rc |= (ptlrpc_server_post_idle_rqbds(svc) > 0);
668                         did_something |= rc;
669                 } while (rc);
670                 
671                 svc->srv_nthreads--;
672         }
673
674         RETURN(did_something);
675 }
676
677 #else /* __KERNEL__ */
678
679 /* Don't use daemonize, it removes fs struct from new thread (bug 418) */
680 void ptlrpc_daemonize(void)
681 {
682         exit_mm(current);
683         lustre_daemonize_helper();
684         exit_files(current);
685         reparent_to_init();
686 }
687
688 static int
689 ptlrpc_retry_rqbds(void *arg)
690 {
691         struct ptlrpc_service *svc = (struct ptlrpc_service *)arg;
692         
693         svc->srv_rqbd_timeout = 0;
694         return (-ETIMEDOUT);
695 }
696
697 static int ptlrpc_main(void *arg)
698 {
699         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
700         struct ptlrpc_service  *svc = data->svc;
701         struct ptlrpc_thread   *thread = data->thread;
702         unsigned long           flags;
703         ENTRY;
704
705         lock_kernel();
706         ptlrpc_daemonize();
707
708         SIGNAL_MASK_LOCK(current, flags);
709         sigfillset(&current->blocked);
710         RECALC_SIGPENDING;
711         SIGNAL_MASK_UNLOCK(current, flags);
712
713         LASSERTF(strlen(data->name) < sizeof(current->comm),
714                  "name %d > len %d\n",
715                  (int)strlen(data->name), (int)sizeof(current->comm));
716         THREAD_NAME(current->comm, sizeof(current->comm) - 1, "%s", data->name);
717         
718         unlock_kernel();
719
720         /* Record that the thread is running */
721         thread->t_flags = SVC_RUNNING;
722         wake_up(&thread->t_ctl_waitq);
723
724         spin_lock_irqsave(&svc->srv_lock, flags);
725         svc->srv_nthreads++;
726         spin_unlock_irqrestore(&svc->srv_lock, flags);
727
728         /* XXX maintain a list of all managed devices: insert here */
729
730         while ((thread->t_flags & SVC_STOPPING) == 0 ||
731                svc->srv_n_difficult_replies != 0) {
732                 /* Don't exit while there are replies to be handled */
733                 struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
734                                                      ptlrpc_retry_rqbds, svc);
735
736                 l_wait_event_exclusive (svc->srv_waitq,
737                               ((thread->t_flags & SVC_STOPPING) != 0 &&
738                                svc->srv_n_difficult_replies == 0) ||
739                               (!list_empty(&svc->srv_idle_rqbds) &&
740                                svc->srv_rqbd_timeout == 0) ||
741                               !list_empty (&svc->srv_reply_queue) ||
742                               (!list_empty (&svc->srv_request_queue) &&
743                                (svc->srv_n_difficult_replies == 0 ||
744                                 svc->srv_n_active_reqs <
745                                 (svc->srv_nthreads - 1))),
746                               &lwi);
747
748                 if (!list_empty (&svc->srv_reply_queue))
749                         ptlrpc_server_handle_reply (svc);
750
751                 /* only handle requests if there are no difficult replies
752                  * outstanding, or I'm not the last thread handling
753                  * requests */
754                 if (!list_empty (&svc->srv_request_queue) &&
755                     (svc->srv_n_difficult_replies == 0 ||
756                      svc->srv_n_active_reqs < (svc->srv_nthreads - 1)))
757                         ptlrpc_server_handle_request (svc);
758
759                 if (!list_empty(&svc->srv_idle_rqbds) &&
760                     ptlrpc_server_post_idle_rqbds(svc) < 0) {
761                         /* I just failed to repost request buffers.  Wait
762                          * for a timeout (unless something else happens)
763                          * before I try again */
764                         svc->srv_rqbd_timeout = HZ/10;
765                 }
766         }
767
768         spin_lock_irqsave(&svc->srv_lock, flags);
769
770         svc->srv_nthreads--;                    /* must know immediately */
771         thread->t_flags = SVC_STOPPED;
772         wake_up(&thread->t_ctl_waitq);
773
774         spin_unlock_irqrestore(&svc->srv_lock, flags);
775
776         CDEBUG(D_NET, "service thread exiting, process %d\n", current->pid);
777         return 0;
778 }
779
780 static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
781                                struct ptlrpc_thread *thread)
782 {
783         struct l_wait_info lwi = { 0 };
784         unsigned long      flags;
785
786         spin_lock_irqsave(&svc->srv_lock, flags);
787         thread->t_flags = SVC_STOPPING;
788         spin_unlock_irqrestore(&svc->srv_lock, flags);
789
790         wake_up_all(&svc->srv_waitq);
791         l_wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED),
792                      &lwi);
793
794         spin_lock_irqsave(&svc->srv_lock, flags);
795         list_del(&thread->t_link);
796         spin_unlock_irqrestore(&svc->srv_lock, flags);
797
798         OBD_FREE(thread, sizeof(*thread));
799 }
800
801 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
802 {
803         unsigned long flags;
804         struct ptlrpc_thread *thread;
805
806         spin_lock_irqsave(&svc->srv_lock, flags);
807         while (!list_empty(&svc->srv_threads)) {
808                 thread = list_entry(svc->srv_threads.next,
809                                     struct ptlrpc_thread, t_link);
810
811                 spin_unlock_irqrestore(&svc->srv_lock, flags);
812                 ptlrpc_stop_thread(svc, thread);
813                 spin_lock_irqsave(&svc->srv_lock, flags);
814         }
815
816         spin_unlock_irqrestore(&svc->srv_lock, flags);
817 }
818
819 /* @base_name should be 12 characters or less - 3 will be added on */
820 int ptlrpc_start_n_threads(struct obd_device *dev, struct ptlrpc_service *svc,
821                            int num_threads, char *base_name)
822 {
823         int i, rc = 0;
824         ENTRY;
825
826         for (i = 0; i < num_threads; i++) {
827                 char name[32];
828                 sprintf(name, "%s_%02d", base_name, i);
829                 rc = ptlrpc_start_thread(dev, svc, name);
830                 if (rc) {
831                         CERROR("cannot start %s thread #%d: rc %d\n", base_name,
832                                i, rc);
833                         ptlrpc_stop_all_threads(svc);
834                 }
835         }
836         RETURN(rc);
837 }
838
839 int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
840                         char *name)
841 {
842         struct l_wait_info lwi = { 0 };
843         struct ptlrpc_svc_data d;
844         struct ptlrpc_thread *thread;
845         unsigned long flags;
846         int rc;
847         ENTRY;
848
849         OBD_ALLOC(thread, sizeof(*thread));
850         if (thread == NULL)
851                 RETURN(-ENOMEM);
852         init_waitqueue_head(&thread->t_ctl_waitq);
853         
854         d.dev = dev;
855         d.svc = svc;
856         d.name = name;
857         d.thread = thread;
858
859         spin_lock_irqsave(&svc->srv_lock, flags);
860         list_add(&thread->t_link, &svc->srv_threads);
861         spin_unlock_irqrestore(&svc->srv_lock, flags);
862
863         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
864          * just drop the VM and FILES in ptlrpc_daemonize() right away.
865          */
866         rc = kernel_thread(ptlrpc_main, &d, CLONE_VM | CLONE_FILES);
867         if (rc < 0) {
868                 CERROR("cannot start thread: %d\n", rc);
869                 OBD_FREE(thread, sizeof(*thread));
870                 RETURN(rc);
871         }
872         l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi);
873
874         RETURN(0);
875 }
876 #endif
877
878 int ptlrpc_unregister_service(struct ptlrpc_service *service)
879 {
880         int                   i;
881         int                   rc;
882         unsigned long         flags;
883         struct ptlrpc_srv_ni *srv_ni;
884         struct l_wait_info    lwi;
885         struct list_head     *tmp;
886
887         LASSERT(list_empty(&service->srv_threads));
888
889         spin_lock (&ptlrpc_all_services_lock);
890         list_del_init (&service->srv_list);
891         spin_unlock (&ptlrpc_all_services_lock);
892
893         ptlrpc_lprocfs_unregister_service(service);
894
895         for (i = 0; i < ptlrpc_ninterfaces; i++) {
896                 srv_ni = &service->srv_interfaces[i];
897                 CDEBUG(D_NET, "%s: tearing down interface %s\n",
898                        service->srv_name, srv_ni->sni_ni->pni_name);
899
900                 /* Unlink all the request buffers.  This forces a 'final'
901                  * event with its 'unlink' flag set for each posted rqbd */
902                 list_for_each(tmp, &srv_ni->sni_active_rqbds) {
903                         struct ptlrpc_request_buffer_desc *rqbd =
904                                 list_entry(tmp, struct ptlrpc_request_buffer_desc, 
905                                            rqbd_list);
906
907                         rc = PtlMDUnlink(rqbd->rqbd_md_h);
908                         LASSERT (rc == PTL_OK || rc == PTL_MD_INVALID);
909                 }
910
911                 /* Wait for the network to release any buffers it's
912                  * currently filling */
913                 for (;;) {
914                         spin_lock_irqsave(&service->srv_lock, flags);
915                         rc = srv_ni->sni_nrqbd_receiving;
916                         spin_unlock_irqrestore(&service->srv_lock, flags);
917
918                         if (rc == 0)
919                                 break;
920                         
921                         /* Network access will complete in finite time but
922                          * the HUGE timeout lets us CWARN for visibility of
923                          * sluggish NALs */
924                         lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
925                         rc = l_wait_event(service->srv_waitq,
926                                           srv_ni->sni_nrqbd_receiving == 0,
927                                           &lwi);
928                         if (rc == -ETIMEDOUT)
929                                 CWARN("Waiting for request buffers on "
930                                       "service %s on interface %s ",
931                                       service->srv_name, srv_ni->sni_ni->pni_name);
932                 }
933
934                 /* schedule all outstanding replies to terminate them */
935                 spin_lock_irqsave(&service->srv_lock, flags);
936                 while (!list_empty(&srv_ni->sni_active_replies)) {
937                         struct ptlrpc_reply_state *rs =
938                                 list_entry(srv_ni->sni_active_replies.next,
939                                            struct ptlrpc_reply_state,
940                                            rs_list);
941                         ptlrpc_schedule_difficult_reply(rs);
942                 }
943                 spin_unlock_irqrestore(&service->srv_lock, flags);
944         }
945
946         /* purge the request queue.  NB No new replies (rqbds all unlinked)
947          * and no service threads, so I'm the only thread noodling the
948          * request queue now */
949         while (!list_empty(&service->srv_request_queue)) {
950                 struct ptlrpc_request *req =
951                         list_entry(service->srv_request_queue.next,
952                                    struct ptlrpc_request,
953                                    rq_list);
954                 
955                 list_del(&req->rq_list);
956                 service->srv_n_queued_reqs--;
957                 service->srv_n_active_reqs++;
958
959                 ptlrpc_server_free_request(service, req);
960         }
961         LASSERT(service->srv_n_queued_reqs == 0);
962         LASSERT(service->srv_n_active_reqs == 0);
963
964         for (i = 0; i < ptlrpc_ninterfaces; i++) {
965                 srv_ni = &service->srv_interfaces[i];
966                 LASSERT(list_empty(&srv_ni->sni_active_rqbds));
967         }
968
969         /* Now free all the request buffers since nothing references them
970          * any more... */
971         while (!list_empty(&service->srv_idle_rqbds)) {
972                 struct ptlrpc_request_buffer_desc *rqbd =
973                         list_entry(service->srv_idle_rqbds.next,
974                                    struct ptlrpc_request_buffer_desc, 
975                                    rqbd_list);
976
977                 ptlrpc_free_rqbd(rqbd);
978         }
979
980         /* wait for all outstanding replies to complete (they were
981          * scheduled having been flagged to abort above) */
982         while (atomic_read(&service->srv_outstanding_replies) != 0) {
983                 struct l_wait_info lwi = LWI_TIMEOUT(10 * HZ, NULL, NULL);
984
985                 rc = l_wait_event(service->srv_waitq,
986                                   !list_empty(&service->srv_reply_queue), &lwi);
987                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
988
989                 if (rc == 0) {
990                         ptlrpc_server_handle_reply(service);
991                         continue;
992                 }
993                 CWARN("Unexpectedly long timeout %p\n", service);
994         }
995
996         OBD_FREE(service,
997                  offsetof(struct ptlrpc_service,
998                           srv_interfaces[ptlrpc_ninterfaces]));
999         return 0;
1000 }