Whamcloud - gitweb
Branch b1_8
[fs/lustre-release.git] / lustre / ptlrpc / service.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_RPC
38 #ifndef __KERNEL__
39 #include <liblustre.h>
40 #include <libcfs/kp30.h>
41 #endif
42 #include <obd_support.h>
43 #include <obd_class.h>
44 #include <lustre_net.h>
45 #include <lnet/types.h>
46 #include "ptlrpc_internal.h"
47
48 /* The following are visible and mutable through /sys/module/ptlrpc */
49 int test_req_buffer_pressure = 0;
50 CFS_MODULE_PARM(test_req_buffer_pressure, "i", int, 0444,
51                 "set non-zero to put pressure on request buffer pools");
52
53 CFS_MODULE_PARM(at_min, "i", int, 0644,
54                 "Adaptive timeout minimum (sec)");
55 CFS_MODULE_PARM(at_max, "i", int, 0644,
56                 "Adaptive timeout maximum (sec)");
57 CFS_MODULE_PARM(at_history, "i", int, 0644,
58                 "Adaptive timeouts remember the slowest event that took place "
59                 "within this period (sec)");
60 CFS_MODULE_PARM(at_early_margin, "i", int, 0644,
61                 "How soon before an RPC deadline to send an early reply");
62 CFS_MODULE_PARM(at_extra, "i", int, 0644,
63                 "How much extra time to give with each early reply");
64
65 /* forward ref */
66 static int ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc);
67
68 static CFS_LIST_HEAD (ptlrpc_all_services);
69 spinlock_t ptlrpc_all_services_lock;
70
71 static char *
72 ptlrpc_alloc_request_buffer (int size)
73 {
74         char *ptr;
75
76         if (size > SVC_BUF_VMALLOC_THRESHOLD)
77                 OBD_VMALLOC(ptr, size);
78         else
79                 OBD_ALLOC(ptr, size);
80
81         return (ptr);
82 }
83
84 static void
85 ptlrpc_free_request_buffer (char *ptr, int size)
86 {
87         if (size > SVC_BUF_VMALLOC_THRESHOLD)
88                 OBD_VFREE(ptr, size);
89         else
90                 OBD_FREE(ptr, size);
91 }
92
93 struct ptlrpc_request_buffer_desc *
94 ptlrpc_alloc_rqbd (struct ptlrpc_service *svc)
95 {
96         struct ptlrpc_request_buffer_desc *rqbd;
97
98         OBD_ALLOC(rqbd, sizeof (*rqbd));
99         if (rqbd == NULL)
100                 return (NULL);
101
102         rqbd->rqbd_service = svc;
103         rqbd->rqbd_refcount = 0;
104         rqbd->rqbd_cbid.cbid_fn = request_in_callback;
105         rqbd->rqbd_cbid.cbid_arg = rqbd;
106         CFS_INIT_LIST_HEAD(&rqbd->rqbd_reqs);
107         rqbd->rqbd_buffer = ptlrpc_alloc_request_buffer(svc->srv_buf_size);
108
109         if (rqbd->rqbd_buffer == NULL) {
110                 OBD_FREE(rqbd, sizeof (*rqbd));
111                 return (NULL);
112         }
113
114         spin_lock(&svc->srv_lock);
115         list_add(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
116         svc->srv_nbufs++;
117         spin_unlock(&svc->srv_lock);
118
119         return (rqbd);
120 }
121
122 void
123 ptlrpc_free_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
124 {
125         struct ptlrpc_service *svc = rqbd->rqbd_service;
126
127         LASSERT (rqbd->rqbd_refcount == 0);
128         LASSERT (list_empty(&rqbd->rqbd_reqs));
129
130         spin_lock(&svc->srv_lock);
131         list_del(&rqbd->rqbd_list);
132         svc->srv_nbufs--;
133         spin_unlock(&svc->srv_lock);
134
135         ptlrpc_free_request_buffer (rqbd->rqbd_buffer, svc->srv_buf_size);
136         OBD_FREE (rqbd, sizeof (*rqbd));
137 }
138
139 int
140 ptlrpc_grow_req_bufs(struct ptlrpc_service *svc)
141 {
142         struct ptlrpc_request_buffer_desc *rqbd;
143         int                                i;
144
145         CDEBUG(D_RPCTRACE, "%s: allocate %d new %d-byte reqbufs (%d/%d left)\n",
146                svc->srv_name, svc->srv_nbuf_per_group, svc->srv_buf_size,
147                svc->srv_nrqbd_receiving, svc->srv_nbufs);
148         for (i = 0; i < svc->srv_nbuf_per_group; i++) {
149                 rqbd = ptlrpc_alloc_rqbd(svc);
150
151                 if (rqbd == NULL) {
152                         CERROR ("%s: Can't allocate request buffer\n",
153                                 svc->srv_name);
154                         return (-ENOMEM);
155                 }
156
157                 if (ptlrpc_server_post_idle_rqbds(svc) < 0)
158                         return (-EAGAIN);
159         }
160
161         return (0);
162 }
163
164 void
165 ptlrpc_save_lock (struct ptlrpc_request *req,
166                   struct lustre_handle *lock, int mode)
167 {
168         struct ptlrpc_reply_state *rs = req->rq_reply_state;
169         int                        idx;
170
171         LASSERT(rs != NULL);
172         LASSERT(rs->rs_nlocks < RS_MAX_LOCKS);
173
174         idx = rs->rs_nlocks++;
175         rs->rs_locks[idx] = *lock;
176         rs->rs_modes[idx] = mode;
177         rs->rs_difficult = 1;
178 }
179
180 void
181 ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
182 {
183         struct ptlrpc_service *svc = rs->rs_service;
184
185 #ifdef CONFIG_SMP
186         LASSERT (spin_is_locked (&svc->srv_lock));
187 #endif
188         LASSERT (rs->rs_difficult);
189         rs->rs_scheduled_ever = 1;              /* flag any notification attempt */
190
191         if (rs->rs_scheduled)                   /* being set up or already notified */
192                 return;
193
194         rs->rs_scheduled = 1;
195         list_del (&rs->rs_list);
196         list_add (&rs->rs_list, &svc->srv_reply_queue);
197         cfs_waitq_signal (&svc->srv_waitq);
198 }
199
200 void
201 ptlrpc_commit_replies (struct obd_export *exp)
202 {
203         struct list_head   *tmp;
204         struct list_head   *nxt;
205
206         /* Find any replies that have been committed and get their service
207          * to attend to complete them. */
208
209         /* CAVEAT EMPTOR: spinlock ordering!!! */
210         spin_lock(&exp->exp_uncommitted_replies_lock);
211
212         list_for_each_safe(tmp, nxt, &exp->exp_uncommitted_replies) {
213                 struct ptlrpc_reply_state *rs =
214                         list_entry(tmp, struct ptlrpc_reply_state, rs_obd_list);
215
216                 LASSERT(rs->rs_difficult);
217                 /* VBR: per-export last_committed */
218                 LASSERT(rs->rs_export);
219                 if (rs->rs_transno <= rs->rs_export->exp_last_committed) {
220                         struct ptlrpc_service *svc = rs->rs_service;
221
222                         spin_lock (&svc->srv_lock);
223                         list_del_init (&rs->rs_obd_list);
224                         ptlrpc_schedule_difficult_reply (rs);
225                         spin_unlock (&svc->srv_lock);
226                 }
227         }
228
229         spin_unlock(&exp->exp_uncommitted_replies_lock);
230 }
231
232 static int
233 ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc)
234 {
235         struct ptlrpc_request_buffer_desc *rqbd;
236         int                                rc;
237         int                                posted = 0;
238
239         for (;;) {
240                 spin_lock(&svc->srv_lock);
241
242                 if (list_empty (&svc->srv_idle_rqbds)) {
243                         spin_unlock(&svc->srv_lock);
244                         return (posted);
245                 }
246
247                 rqbd = list_entry(svc->srv_idle_rqbds.next,
248                                   struct ptlrpc_request_buffer_desc,
249                                   rqbd_list);
250                 list_del (&rqbd->rqbd_list);
251
252                 /* assume we will post successfully */
253                 svc->srv_nrqbd_receiving++;
254                 list_add (&rqbd->rqbd_list, &svc->srv_active_rqbds);
255
256                 spin_unlock(&svc->srv_lock);
257
258                 rc = ptlrpc_register_rqbd(rqbd);
259                 if (rc != 0)
260                         break;
261
262                 posted = 1;
263         }
264
265         spin_lock(&svc->srv_lock);
266
267         svc->srv_nrqbd_receiving--;
268         list_del(&rqbd->rqbd_list);
269         list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
270
271         /* Don't complain if no request buffers are posted right now; LNET
272          * won't drop requests because we set the portal lazy! */
273
274         spin_unlock(&svc->srv_lock);
275
276         return (-1);
277 }
278
279 static void ptlrpc_at_timer(unsigned long castmeharder)
280 {
281         struct ptlrpc_service *svc = (struct ptlrpc_service *)castmeharder;
282         svc->srv_at_check = 1;
283         svc->srv_at_checktime = cfs_time_current();
284         cfs_waitq_signal(&svc->srv_waitq);
285 }
286
287 /* @threadname should be 11 characters or less - 3 will be added on */
288 struct ptlrpc_service *
289 ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
290                 int req_portal, int rep_portal, int watchdog_factor,
291                 svc_handler_t handler, char *name,
292                 cfs_proc_dir_entry_t *proc_entry,
293                 svcreq_printfn_t svcreq_printfn,
294                 int min_threads, int max_threads, char *threadname,
295                 svc_hpreq_handler_t hp_handler)
296 {
297         int                     rc;
298         struct ptlrpc_at_array *array;
299         struct ptlrpc_service  *service;
300         unsigned int            size, index;
301         ENTRY;
302
303         LASSERT (nbufs > 0);
304         LASSERT (bufsize >= max_req_size);
305
306         OBD_ALLOC(service, sizeof(*service));
307         if (service == NULL)
308                 RETURN(NULL);
309
310         /* First initialise enough for early teardown */
311
312         service->srv_name = name;
313         spin_lock_init(&service->srv_lock);
314         CFS_INIT_LIST_HEAD(&service->srv_threads);
315         cfs_waitq_init(&service->srv_waitq);
316
317         service->srv_nbuf_per_group = test_req_buffer_pressure ? 1 : nbufs;
318         service->srv_max_req_size = max_req_size;
319         service->srv_buf_size = bufsize;
320         service->srv_rep_portal = rep_portal;
321         service->srv_req_portal = req_portal;
322         service->srv_watchdog_factor = watchdog_factor;
323         service->srv_handler = handler;
324         service->srv_request_history_print_fn = svcreq_printfn;
325         service->srv_request_seq = 1;           /* valid seq #s start at 1 */
326         service->srv_request_max_cull_seq = 0;
327         service->srv_threads_min = min_threads;
328         service->srv_threads_max = max_threads;
329         service->srv_thread_name = threadname;
330         service->srv_hpreq_handler = hp_handler;
331         service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
332         service->srv_hpreq_count = 0;
333         service->srv_n_hpreq = 0;
334
335         rc = LNetSetLazyPortal(service->srv_req_portal);
336         LASSERT (rc == 0);
337
338         CFS_INIT_LIST_HEAD(&service->srv_request_queue);
339         CFS_INIT_LIST_HEAD(&service->srv_request_hpq);
340         CFS_INIT_LIST_HEAD(&service->srv_idle_rqbds);
341         CFS_INIT_LIST_HEAD(&service->srv_active_rqbds);
342         CFS_INIT_LIST_HEAD(&service->srv_history_rqbds);
343         CFS_INIT_LIST_HEAD(&service->srv_request_history);
344         CFS_INIT_LIST_HEAD(&service->srv_active_replies);
345         CFS_INIT_LIST_HEAD(&service->srv_reply_queue);
346         CFS_INIT_LIST_HEAD(&service->srv_free_rs_list);
347         cfs_waitq_init(&service->srv_free_rs_waitq);
348
349         spin_lock_init(&service->srv_at_lock);
350         CFS_INIT_LIST_HEAD(&service->srv_req_in_queue);
351
352         array = &service->srv_at_array;
353         size = at_est2timeout(at_max);
354         array->paa_size = size;
355         array->paa_count = 0;
356         array->paa_deadline = -1;
357
358         /* allocate memory for srv_at_array (ptlrpc_at_array) */
359         OBD_ALLOC(array->paa_reqs_array, sizeof(struct list_head) * size);
360         if (array->paa_reqs_array == NULL)
361                 GOTO(failed, NULL);
362
363         for (index = 0; index < size; index++)
364                 CFS_INIT_LIST_HEAD(&array->paa_reqs_array[index]);
365
366         OBD_ALLOC(array->paa_reqs_count, sizeof(__u32) * size);
367         if (array->paa_reqs_count == NULL)
368                 GOTO(failed, NULL);
369
370         cfs_timer_init(&service->srv_at_timer, ptlrpc_at_timer, service);
371         /* At SOW, service time should be quick; 10s seems generous. If client
372            timeout is less than this, we'll be sending an early reply. */
373         at_init(&service->srv_at_estimate, 10, 0);
374
375         spin_lock (&ptlrpc_all_services_lock);
376         list_add (&service->srv_list, &ptlrpc_all_services);
377         spin_unlock (&ptlrpc_all_services_lock);
378
379         /* Now allocate the request buffers */
380         rc = ptlrpc_grow_req_bufs(service);
381         /* We shouldn't be under memory pressure at startup, so
382          * fail if we can't post all our buffers at this time. */
383         if (rc != 0)
384                 GOTO(failed, NULL);
385
386         /* Now allocate pool of reply buffers */
387         /* Increase max reply size to next power of two */
388         service->srv_max_reply_size = 1;
389         while (service->srv_max_reply_size < max_reply_size)
390                 service->srv_max_reply_size <<= 1;
391
392         if (proc_entry != NULL)
393                 ptlrpc_lprocfs_register_service(proc_entry, service);
394
395         CDEBUG(D_NET, "%s: Started, listening on portal %d\n",
396                service->srv_name, service->srv_req_portal);
397
398         RETURN(service);
399 failed:
400         ptlrpc_unregister_service(service);
401         return NULL;
402 }
403
404 /**
405  * to actually free the request, must be called without holding svc_lock.
406  * note it's caller's responsibility to unlink req->rq_list.
407  */
408 static void ptlrpc_server_free_request(struct ptlrpc_request *req)
409 {
410         LASSERT(atomic_read(&req->rq_refcount) == 0);
411         LASSERT(list_empty(&req->rq_timed_list));
412
413         /* DEBUG_REQ() assumes the reply state of a request with a valid
414          * ref will not be destroyed until that reference is dropped. */
415         ptlrpc_req_drop_rs(req);
416
417         if (req != &req->rq_rqbd->rqbd_req) {
418                 /* NB request buffers use an embedded
419                  * req if the incoming req unlinked the
420                  * MD; this isn't one of them! */
421                 OBD_FREE(req, sizeof(*req));
422         }
423 }
424
425 /**
426  * drop a reference count of the request. if it reaches 0, we either
427  * put it into history list, or free it immediately.
428  */
429 static void ptlrpc_server_drop_request(struct ptlrpc_request *req)
430 {
431         struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
432         struct ptlrpc_service             *svc = rqbd->rqbd_service;
433         int                                refcount;
434         struct list_head                  *tmp;
435         struct list_head                  *nxt;
436
437         if (!atomic_dec_and_test(&req->rq_refcount))
438                 return;
439
440         spin_lock(&svc->srv_lock);
441
442         svc->srv_n_active_reqs--;
443         list_add(&req->rq_list, &rqbd->rqbd_reqs);
444
445         refcount = --(rqbd->rqbd_refcount);
446         if (refcount == 0) {
447                 /* request buffer is now idle: add to history */
448                 list_del(&rqbd->rqbd_list);
449                 list_add_tail(&rqbd->rqbd_list, &svc->srv_history_rqbds);
450                 svc->srv_n_history_rqbds++;
451
452                 /* cull some history?
453                  * I expect only about 1 or 2 rqbds need to be recycled here */
454                 while (svc->srv_n_history_rqbds > svc->srv_max_history_rqbds) {
455                         rqbd = list_entry(svc->srv_history_rqbds.next,
456                                           struct ptlrpc_request_buffer_desc,
457                                           rqbd_list);
458
459                         list_del(&rqbd->rqbd_list);
460                         svc->srv_n_history_rqbds--;
461
462                         /* remove rqbd's reqs from svc's req history while
463                          * I've got the service lock */
464                         list_for_each(tmp, &rqbd->rqbd_reqs) {
465                                 req = list_entry(tmp, struct ptlrpc_request,
466                                                  rq_list);
467                                 /* Track the highest culled req seq */
468                                 if (req->rq_history_seq >
469                                     svc->srv_request_max_cull_seq)
470                                         svc->srv_request_max_cull_seq =
471                                                 req->rq_history_seq;
472                                 list_del(&req->rq_history_list);
473                         }
474
475                         spin_unlock(&svc->srv_lock);
476
477                         list_for_each_safe(tmp, nxt, &rqbd->rqbd_reqs) {
478                                 req = list_entry(rqbd->rqbd_reqs.next,
479                                                  struct ptlrpc_request,
480                                                  rq_list);
481                                 list_del(&req->rq_list);
482                                 ptlrpc_server_free_request(req);
483                         }
484
485                         spin_lock(&svc->srv_lock);
486                         /*
487                          * now all reqs including the embedded req has been
488                          * disposed, schedule request buffer for re-use.
489                          */
490                         LASSERT(atomic_read(&rqbd->rqbd_req.rq_refcount) == 0);
491                         list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
492                 }
493
494                 spin_unlock(&svc->srv_lock);
495         } else if (req->rq_reply_state && req->rq_reply_state->rs_prealloc) {
496                  /* If we are low on memory, we are not interested in history */
497                 list_del(&req->rq_list);
498                 list_del_init(&req->rq_history_list);
499                 spin_unlock(&svc->srv_lock);
500
501                 ptlrpc_server_free_request(req);
502         } else {
503                 spin_unlock(&svc->srv_lock);
504         }
505 }
506
507 /**
508  * to finish a request: stop sending more early replies, and release
509  * the request. should be called after we finished handling the request.
510  */
511 static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
512 {
513         struct ptlrpc_service  *svc = req->rq_rqbd->rqbd_service;
514
515         if (req->rq_export) {
516                 class_export_put(req->rq_export);
517                 req->rq_export = NULL;
518         }
519
520         if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
521                 DEBUG_REQ(D_INFO, req, "free req");
522
523         spin_lock(&svc->srv_at_lock);
524         req->rq_sent_final = 1;
525         list_del_init(&req->rq_timed_list);
526         if (req->rq_at_linked) {
527                 struct ptlrpc_at_array *array = &svc->srv_at_array;
528                 __u32 index = req->rq_at_index;
529
530                 req->rq_at_linked = 0;
531                 array->paa_reqs_count[index]--;
532                 array->paa_count--;
533         }
534         spin_unlock(&svc->srv_at_lock);
535
536         ptlrpc_server_drop_request(req);
537 }
538
539 /* This function makes sure dead exports are evicted in a timely manner.
540    This function is only called when some export receives a message (i.e.,
541    the network is up.) */
542 static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
543 {
544         struct obd_export *oldest_exp;
545         time_t oldest_time, new_time;
546
547         ENTRY;
548
549         LASSERT(exp);
550
551         /* Compensate for slow machines, etc, by faking our request time
552            into the future.  Although this can break the strict time-ordering
553            of the list, we can be really lazy here - we don't have to evict
554            at the exact right moment.  Eventually, all silent exports
555            will make it to the top of the list. */
556
557         /* Do not pay attention on 1sec or smaller renewals. */
558         new_time = cfs_time_current_sec() + extra_delay;
559         if (exp->exp_last_request_time + 1 /*second */ >= new_time)
560                 RETURN_EXIT;
561
562         exp->exp_last_request_time = new_time;
563         CDEBUG(D_INFO, "updating export %s at %ld\n",
564                exp->exp_client_uuid.uuid,
565                exp->exp_last_request_time);
566
567         /* exports may get disconnected from the chain even though the
568            export has references, so we must keep the spin lock while
569            manipulating the lists */
570         spin_lock(&exp->exp_obd->obd_dev_lock);
571
572         if (list_empty(&exp->exp_obd_chain_timed)) {
573                 /* this one is not timed */
574                 spin_unlock(&exp->exp_obd->obd_dev_lock);
575                 RETURN_EXIT;
576         }
577
578         list_move_tail(&exp->exp_obd_chain_timed,
579                        &exp->exp_obd->obd_exports_timed);
580
581         oldest_exp = list_entry(exp->exp_obd->obd_exports_timed.next,
582                                 struct obd_export, exp_obd_chain_timed);
583         oldest_time = oldest_exp->exp_last_request_time;
584         spin_unlock(&exp->exp_obd->obd_dev_lock);
585
586         if (exp->exp_obd->obd_recovering) {
587                 /* be nice to everyone during recovery */
588                 EXIT;
589                 return;
590         }
591
592         /* Note - racing to start/reset the obd_eviction timer is safe */
593         if (exp->exp_obd->obd_eviction_timer == 0) {
594                 /* Check if the oldest entry is expired. */
595                 if (cfs_time_current_sec() > (oldest_time + PING_EVICT_TIMEOUT +
596                                               extra_delay)) {
597                         /* We need a second timer, in case the net was down and
598                          * it just came back. Since the pinger may skip every
599                          * other PING_INTERVAL (see note in ptlrpc_pinger_main),
600                          * we better wait for 3. */
601                         exp->exp_obd->obd_eviction_timer =
602                                 cfs_time_current_sec() + 3 * PING_INTERVAL;
603                         CDEBUG(D_HA, "%s: Think about evicting %s from %ld\n",
604                                exp->exp_obd->obd_name, obd_export_nid2str(exp),
605                                oldest_time);
606                 }
607         } else {
608                 if (cfs_time_current_sec() >
609                     (exp->exp_obd->obd_eviction_timer + extra_delay)) {
610                         /* The evictor won't evict anyone who we've heard from
611                          * recently, so we don't have to check before we start
612                          * it. */
613                         if (!ping_evictor_wake(exp))
614                                 exp->exp_obd->obd_eviction_timer = 0;
615                 }
616         }
617
618         EXIT;
619 }
620
621 static int ptlrpc_check_req(struct ptlrpc_request *req)
622 {
623         if (lustre_msg_get_conn_cnt(req->rq_reqmsg) <
624             req->rq_export->exp_conn_cnt) {
625                 DEBUG_REQ(D_ERROR, req,
626                           "DROPPING req from old connection %d < %d",
627                           lustre_msg_get_conn_cnt(req->rq_reqmsg),
628                           req->rq_export->exp_conn_cnt);
629                 return -EEXIST;
630         }
631         if (req->rq_export->exp_obd && req->rq_export->exp_obd->obd_fail) {
632              /* Failing over, don't handle any more reqs, send
633                 error response instead. */
634                 CDEBUG(D_RPCTRACE, "Dropping req %p for failed obd %s\n",
635                        req, req->rq_export->exp_obd->obd_name);
636                 req->rq_status = -ENODEV;
637                 ptlrpc_error(req);
638                 return -ENODEV;
639         }
640         return 0;
641 }
642
643 static void ptlrpc_at_set_timer(struct ptlrpc_service *svc)
644 {
645         struct ptlrpc_at_array *array = &svc->srv_at_array;
646         __s32 next;
647
648         spin_lock(&svc->srv_at_lock);
649         if (array->paa_count == 0) {
650                 cfs_timer_disarm(&svc->srv_at_timer);
651                 spin_unlock(&svc->srv_at_lock);
652                 return;
653         }
654
655         /* Set timer for closest deadline */
656         next = (__s32)(array->paa_deadline - cfs_time_current_sec() -
657                        at_early_margin);
658         if (next <= 0)
659                 ptlrpc_at_timer((unsigned long)svc);
660         else
661                 cfs_timer_arm(&svc->srv_at_timer, cfs_time_shift(next));
662         spin_unlock(&svc->srv_at_lock);
663         CDEBUG(D_INFO, "armed %s at %+ds\n", svc->srv_name, next);
664 }
665
666 /* Add rpc to early reply check list */
667 static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
668 {
669         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
670         struct ptlrpc_request *rq = NULL;
671         struct ptlrpc_at_array *array = &svc->srv_at_array;
672         __u32 index;
673         int found = 0;
674
675         if (AT_OFF)
676                 return(0);
677
678         if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0)
679                 return(-ENOSYS);
680
681         spin_lock(&svc->srv_at_lock);
682
683         if (unlikely(req->rq_sent_final)) {
684                 spin_unlock(&svc->srv_at_lock);
685                 return 0;
686         }
687
688         LASSERT(list_empty(&req->rq_timed_list));
689
690         index = req->rq_deadline % array->paa_size;
691         if (array->paa_reqs_count[index] > 0) {
692                 /* latest rpcs will have the latest deadlines in the list,
693                  * so search backward. */
694                 list_for_each_entry_reverse(rq, &array->paa_reqs_array[index],
695                                             rq_timed_list) {
696                         if (req->rq_deadline >= rq->rq_deadline) {
697                                 list_add(&req->rq_timed_list,
698                                          &rq->rq_timed_list);
699                                 break;
700                         }
701                 }
702         }
703         
704         /* Add the request at the head of the list */
705         if (list_empty(&req->rq_timed_list))
706                 list_add(&req->rq_timed_list, &array->paa_reqs_array[index]);
707
708         req->rq_at_linked = 1;
709         req->rq_at_index = index;
710         array->paa_reqs_count[index]++;
711         array->paa_count++;
712         if (array->paa_count == 1 || array->paa_deadline > req->rq_deadline) {
713                 array->paa_deadline = req->rq_deadline;
714                 found = 1;
715         }
716         spin_unlock(&svc->srv_at_lock);
717
718         if (found)
719                 ptlrpc_at_set_timer(svc);
720
721         return 0;
722 }
723
724 static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
725                                       int extra_time)
726 {
727         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
728         struct ptlrpc_request *reqcopy;
729         struct lustre_msg *reqmsg;
730         long olddl = req->rq_deadline - cfs_time_current_sec();
731         time_t newdl;
732         int rc;
733         ENTRY;
734
735         /* deadline is when the client expects us to reply, margin is the
736            difference between clients' and servers' expectations */
737         DEBUG_REQ(D_ADAPTTO, req,
738                   "%ssending early reply (deadline %+lds, margin %+lds) for "
739                   "%d+%d", AT_OFF ? "AT off - not " : "",
740                   olddl, olddl - at_get(&svc->srv_at_estimate),
741                   at_get(&svc->srv_at_estimate), extra_time);
742
743         if (AT_OFF)
744                 RETURN(0);
745
746         if (olddl < 0) {
747                 DEBUG_REQ(D_WARNING, req, "Already past deadline (%+lds), "
748                           "not sending early reply. Consider increasing "
749                           "at_early_margin (%d)?", olddl, at_early_margin);
750
751                 /* Return an error so we're not re-added to the timed list. */
752                 RETURN(-ETIMEDOUT);
753         }
754
755         if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0){
756                 DEBUG_REQ(D_INFO, req, "Wanted to ask client for more time, "
757                           "but no AT support");
758                 RETURN(-ENOSYS);
759         }
760
761         if (req->rq_export && req->rq_export->exp_in_recovery) {
762                 /* don't increase server estimates during recovery, and give
763                    clients the full recovery time. */
764                 newdl = cfs_time_current_sec() +
765                         req->rq_export->exp_obd->obd_recovery_timeout;
766         } else {
767                 if (extra_time) {
768                         /* Fake our processing time into the future to ask the
769                            clients for some extra amount of time */
770                         extra_time += cfs_time_current_sec() -
771                                       req->rq_arrival_time.tv_sec;
772                         at_add(&svc->srv_at_estimate, extra_time);
773                 }
774                 newdl = req->rq_arrival_time.tv_sec +
775                         at_get(&svc->srv_at_estimate);
776         }
777         if (req->rq_deadline >= newdl) {
778                 /* We're not adding any time, no need to send an early reply
779                    (e.g. maybe at adaptive_max) */
780                 DEBUG_REQ(D_WARNING, req, "Couldn't add any time "
781                           "(%ld/%ld), not sending early reply\n",
782                           olddl, newdl - cfs_time_current_sec());
783                 RETURN(-ETIMEDOUT);
784         }
785
786         OBD_ALLOC(reqcopy, sizeof *reqcopy);
787         if (reqcopy == NULL)
788                 RETURN(-ENOMEM);
789         OBD_ALLOC(reqmsg, req->rq_reqlen);
790         if (!reqmsg) {
791                 OBD_FREE(reqcopy, sizeof *reqcopy);
792                 RETURN(-ENOMEM);
793         }
794
795         *reqcopy = *req;
796         reqcopy->rq_reply_state = NULL;
797         reqcopy->rq_rep_swab_mask = 0;
798         /* We only need the reqmsg for the magic */
799         reqcopy->rq_reqmsg = reqmsg;
800         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
801
802         if (req->rq_sent_final) {
803                 DEBUG_REQ(D_ADAPTTO, reqcopy, "Normal reply already sent out, "
804                           "abort sending early reply\n");
805                 GOTO(out, rc = 0);
806         }
807
808         /* Connection ref */
809         reqcopy->rq_export = class_conn2export(
810                                      lustre_msg_get_handle(reqcopy->rq_reqmsg));
811         if (reqcopy->rq_export == NULL)
812                 GOTO(out, rc = -ENODEV);
813
814         /* RPC ref */
815         class_export_rpc_get(reqcopy->rq_export);
816         if (reqcopy->rq_export->exp_obd &&
817             reqcopy->rq_export->exp_obd->obd_fail)
818                 GOTO(out_put, rc = -ENODEV);
819
820         rc = lustre_pack_reply_flags(reqcopy, 1, NULL, NULL, LPRFL_EARLY_REPLY);
821         if (rc)
822                 GOTO(out_put, rc);
823
824         rc = ptlrpc_send_reply(reqcopy, PTLRPC_REPLY_EARLY);
825
826         if (!rc) {
827                 /* Adjust our own deadline to what we told the client */
828                 req->rq_deadline = newdl;
829                 req->rq_early_count++; /* number sent, server side */
830         } else {
831                 DEBUG_REQ(D_ERROR, req, "Early reply send failed %d", rc);
832         }
833
834         /* Free the (early) reply state from lustre_pack_reply.
835            (ptlrpc_send_reply takes it's own rs ref, so this is safe here) */
836         ptlrpc_req_drop_rs(reqcopy);
837
838 out_put:
839         class_export_rpc_put(reqcopy->rq_export);
840         class_export_put(reqcopy->rq_export);
841 out:
842         OBD_FREE(reqmsg, req->rq_reqlen);
843         OBD_FREE(reqcopy, sizeof *reqcopy);
844         RETURN(rc);
845 }
846
847 /* Send early replies to everybody expiring within at_early_margin
848    asking for at_extra time */
849 static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
850 {
851         struct ptlrpc_request *rq, *n;
852         struct list_head work_list;
853         struct ptlrpc_at_array *array = &svc->srv_at_array;
854         __u32  index, count;
855         time_t deadline;
856         time_t now = cfs_time_current_sec();
857         cfs_duration_t delay;
858         int first, counter = 0;
859         ENTRY;
860
861         spin_lock(&svc->srv_at_lock);
862         if (svc->srv_at_check == 0) {
863                 spin_unlock(&svc->srv_at_lock);
864                 RETURN(0);
865         }
866         delay = cfs_time_sub(cfs_time_current(), svc->srv_at_checktime);
867         svc->srv_at_check = 0;
868
869         if (array->paa_count == 0) {
870                 spin_unlock(&svc->srv_at_lock);
871                 RETURN(0);
872         }
873
874         /* The timer went off, but maybe the nearest rpc already completed. */
875         first = array->paa_deadline - now;
876         if (first > at_early_margin) {
877                 /* We've still got plenty of time.  Reset the timer. */
878                 spin_unlock(&svc->srv_at_lock);
879                 ptlrpc_at_set_timer(svc);
880                 RETURN(0);
881         }
882
883         /* We're close to a timeout, and we don't know how much longer the
884            server will take. Send early replies to everyone expiring soon. */
885         CFS_INIT_LIST_HEAD(&work_list);
886         deadline = -1;
887         index = array->paa_deadline % array->paa_size;
888         count = array->paa_count;
889         while (count > 0) {
890                 count -= array->paa_reqs_count[index];
891                 list_for_each_entry_safe(rq, n, &array->paa_reqs_array[index],
892                                          rq_timed_list) {
893                         if (rq->rq_deadline <= now + at_early_margin) {
894                                 list_move(&rq->rq_timed_list, &work_list);
895                                 counter++;
896                                 array->paa_reqs_count[index]--;
897                                 array->paa_count--;
898                                 rq->rq_at_linked = 0;
899                                 continue;
900                         }
901
902                         /* update the earliest deadline */
903                         if (deadline == -1 || rq->rq_deadline < deadline)
904                                 deadline = rq->rq_deadline;
905
906                         break;
907                 }
908
909                 if (++index >= array->paa_size)
910                         index = 0;
911         }
912         array->paa_deadline = deadline;
913         spin_unlock(&svc->srv_at_lock);
914
915         /* we have a new earliest deadline, restart the timer */
916         ptlrpc_at_set_timer(svc);
917
918         CDEBUG(D_ADAPTTO, "timeout in %+ds, asking for %d secs on %d early "
919                "replies\n", first, at_extra, counter);
920
921         if (first < 0) {
922                 /* We're already past request deadlines before we even get a
923                    chance to send early replies */
924                 LCONSOLE_WARN("%s: This server is not able to keep up with "
925                               "request traffic (cpu-bound).\n",  svc->srv_name);
926                 CWARN("earlyQ=%d reqQ=%d recA=%d, svcEst=%d, "
927                       "delay="CFS_DURATION_T"(jiff)\n",
928                       counter, svc->srv_n_queued_reqs, svc->srv_n_active_reqs,
929                       at_get(&svc->srv_at_estimate), delay);
930         }
931
932         /* ptlrpc_server_finish_request may delete an entry out of
933          * the work list */
934         spin_lock(&svc->srv_at_lock);
935         while (!list_empty(&work_list)) {
936                 rq = list_entry(work_list.next, struct ptlrpc_request,
937                                 rq_timed_list);
938                 list_del_init(&rq->rq_timed_list);
939                 /* if the entry is still in the worklist, it hasn't been
940                    deleted, and is safe to take a ref to keep the req around */
941                 atomic_inc(&rq->rq_refcount);
942                 spin_unlock(&svc->srv_at_lock);
943
944                 if (ptlrpc_at_send_early_reply(rq, at_extra) == 0)
945                         ptlrpc_at_add_timed(rq);
946
947                 ptlrpc_server_drop_request(rq);
948                 spin_lock(&svc->srv_at_lock);
949         }
950         spin_unlock(&svc->srv_at_lock);
951
952         RETURN(0);
953 }
954
955 /**
956  * Put the request to the export list if the request may become
957  * a high priority one.
958  */
959 static int ptlrpc_hpreq_init(struct ptlrpc_service *svc,
960                              struct ptlrpc_request *req)
961 {
962         int rc;
963         ENTRY;
964
965         if (svc->srv_hpreq_handler) {
966                 rc = svc->srv_hpreq_handler(req);
967                 if (rc)
968                         RETURN(rc);
969         }
970         if (req->rq_export && req->rq_ops) {
971                 spin_lock(&req->rq_export->exp_lock);
972                 list_add(&req->rq_exp_list, &req->rq_export->exp_queued_rpc);
973                 spin_unlock(&req->rq_export->exp_lock);
974         }
975
976         RETURN(0);
977 }
978
979 /** Remove the request from the export list. */
980 static void ptlrpc_hpreq_fini(struct ptlrpc_request *req)
981 {
982         ENTRY;
983         if (req->rq_export && req->rq_ops) {
984                 spin_lock(&req->rq_export->exp_lock);
985                 list_del_init(&req->rq_exp_list);
986                 spin_unlock(&req->rq_export->exp_lock);
987         }
988         EXIT;
989 }
990
991 /**
992  * Make the request a high priority one.
993  *
994  * All the high priority requests are queued in a separate FIFO
995  * ptlrpc_service::srv_request_hpq list which is parallel to
996  * ptlrpc_service::srv_request_queue list but has a higher priority
997  * for handling.
998  *
999  * \see ptlrpc_server_handle_request().
1000  */
1001 static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service *svc,
1002                                         struct ptlrpc_request *req)
1003 {
1004         ENTRY;
1005         LASSERT(svc != NULL);
1006         spin_lock(&req->rq_lock);
1007         if (req->rq_hp == 0) {
1008                 int opc = lustre_msg_get_opc(req->rq_reqmsg);
1009
1010                 /* Add to the high priority queue. */
1011                 list_move_tail(&req->rq_list, &svc->srv_request_hpq);
1012                 req->rq_hp = 1;
1013                 if (opc != OBD_PING)
1014                         DEBUG_REQ(D_NET, req, "high priority req");
1015         }
1016         spin_unlock(&req->rq_lock);
1017         EXIT;
1018 }
1019
1020 void ptlrpc_hpreq_reorder(struct ptlrpc_request *req)
1021 {
1022         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
1023         ENTRY;
1024
1025         spin_lock(&svc->srv_lock);
1026         /* It may happen that the request is already taken for the processing
1027          * but still in the export list, do not re-add it into the HP list. */
1028         if (req->rq_phase == RQ_PHASE_NEW)
1029                 ptlrpc_hpreq_reorder_nolock(svc, req);
1030         spin_unlock(&svc->srv_lock);
1031         EXIT;
1032 }
1033
1034 /** Check if the request if a high priority one. */
1035 static int ptlrpc_server_hpreq_check(struct ptlrpc_request *req)
1036 {
1037         int opc, rc = 0;
1038         ENTRY;
1039
1040         /* Check by request opc. */
1041         opc = lustre_msg_get_opc(req->rq_reqmsg);
1042         if (opc == OBD_PING)
1043                 RETURN(1);
1044
1045         /* Perform request specific check. */
1046         if (req->rq_ops && req->rq_ops->hpreq_check)
1047                 rc = req->rq_ops->hpreq_check(req);
1048         RETURN(rc);
1049 }
1050
1051 /** Check if a request is a high priority one. */
1052 static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
1053                                      struct ptlrpc_request *req)
1054 {
1055         int rc;
1056         ENTRY;
1057
1058         rc = ptlrpc_server_hpreq_check(req);
1059         if (rc < 0)
1060                 RETURN(rc);
1061
1062         spin_lock(&svc->srv_lock);
1063         /* Before inserting the request into the queue, check if it is not
1064          * inserted yet, or even already handled -- it may happen due to
1065          * a racing ldlm_server_blocking_ast(). */
1066         if (req->rq_phase == RQ_PHASE_NEW && list_empty(&req->rq_list)) {
1067                 if (rc)
1068                         ptlrpc_hpreq_reorder_nolock(svc, req);
1069                 else
1070                         list_add_tail(&req->rq_list, &svc->srv_request_queue);
1071         }
1072         spin_unlock(&svc->srv_lock);
1073
1074         RETURN(0);
1075 }
1076
1077 /* Only allow normal priority requests on a service that has a high-priority
1078  * queue if forced (i.e. cleanup), if there are other high priority requests
1079  * already being processed (i.e. those threads can service more high-priority
1080  * requests), or if there are enough idle threads that a later thread can do
1081  * a high priority request. */
1082 static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
1083 {
1084         return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 ||
1085                svc->srv_n_active_reqs < svc->srv_threads_running - 2;
1086 }
1087
1088 static struct ptlrpc_request *
1089 ptlrpc_server_request_get(struct ptlrpc_service *svc, int force)
1090 {
1091         struct ptlrpc_request *req = NULL;
1092         ENTRY;
1093
1094         if (ptlrpc_server_allow_normal(svc, force) &&
1095             !list_empty(&svc->srv_request_queue) &&
1096             (list_empty(&svc->srv_request_hpq) ||
1097              svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) {
1098                 req = list_entry(svc->srv_request_queue.next,
1099                                  struct ptlrpc_request, rq_list);
1100                 svc->srv_hpreq_count = 0;
1101         } else if (!list_empty(&svc->srv_request_hpq)) {
1102                 req = list_entry(svc->srv_request_hpq.next,
1103                                  struct ptlrpc_request, rq_list);
1104                 svc->srv_hpreq_count++;
1105         }
1106         RETURN(req);
1107 }
1108
1109 static int ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
1110 {
1111         return ((ptlrpc_server_allow_normal(svc, force) &&
1112                  !list_empty(&svc->srv_request_queue)) ||
1113                 !list_empty(&svc->srv_request_hpq));
1114 }
1115
1116 /* Handle freshly incoming reqs, add to timed early reply list,
1117    pass on to regular request queue */
1118 static int
1119 ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
1120 {
1121         struct ptlrpc_request *req;
1122         __u32                  deadline;
1123         int                    rc;
1124         ENTRY;
1125
1126         LASSERT(svc);
1127
1128         spin_lock(&svc->srv_lock);
1129         if (list_empty(&svc->srv_req_in_queue)) {
1130                 spin_unlock(&svc->srv_lock);
1131                 RETURN(0);
1132         }
1133
1134         req = list_entry(svc->srv_req_in_queue.next,
1135                          struct ptlrpc_request, rq_list);
1136         list_del_init (&req->rq_list);
1137         /* Consider this still a "queued" request as far as stats are
1138            concerned */
1139         spin_unlock(&svc->srv_lock);
1140
1141         /* Clear request swab mask; this is a new request */
1142         req->rq_req_swab_mask = 0;
1143
1144         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
1145         if (rc < 0) {
1146                 CERROR ("error unpacking request: ptl %d from %s"
1147                         " xid "LPU64"\n", svc->srv_req_portal,
1148                         libcfs_id2str(req->rq_peer), req->rq_xid);
1149                 goto err_req;
1150         }
1151
1152         if (rc > 0)
1153                 lustre_set_req_swabbed(req, MSG_PTLRPC_HEADER_OFF);
1154
1155         rc = lustre_unpack_req_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
1156         if (rc) {
1157                 CERROR ("error unpacking ptlrpc body: ptl %d from %s"
1158                         " xid "LPU64"\n", svc->srv_req_portal,
1159                         libcfs_id2str(req->rq_peer), req->rq_xid);
1160                 goto err_req;
1161         }
1162
1163         rc = -EINVAL;
1164         if (lustre_msg_get_type(req->rq_reqmsg) != PTL_RPC_MSG_REQUEST) {
1165                 CERROR("wrong packet type received (type=%u) from %s\n",
1166                        lustre_msg_get_type(req->rq_reqmsg),
1167                        libcfs_id2str(req->rq_peer));
1168                 goto err_req;
1169         }
1170
1171         CDEBUG(D_NET, "got req "LPU64"\n", req->rq_xid);
1172
1173         req->rq_export = class_conn2export(
1174                 lustre_msg_get_handle(req->rq_reqmsg));
1175         if (req->rq_export) {
1176                 rc = ptlrpc_check_req(req);
1177                 if (rc)
1178                         goto err_req;
1179                 ptlrpc_update_export_timer(req->rq_export, 0);
1180         }
1181
1182         /* req_in handling should/must be fast */
1183         if (cfs_time_current_sec() - req->rq_arrival_time.tv_sec > 5)
1184                 DEBUG_REQ(D_WARNING, req, "Slow req_in handling %lus",
1185                           cfs_time_current_sec() - req->rq_arrival_time.tv_sec);
1186
1187         /* Set rpc server deadline and add it to the timed list */
1188         deadline = (lustre_msghdr_get_flags(req->rq_reqmsg) &
1189                     MSGHDR_AT_SUPPORT) ?
1190                    /* The max time the client expects us to take */
1191                    lustre_msg_get_timeout(req->rq_reqmsg) : obd_timeout;
1192         req->rq_deadline = req->rq_arrival_time.tv_sec + deadline;
1193         if (unlikely(deadline == 0)) {
1194                 DEBUG_REQ(D_ERROR, req, "Dropping request with 0 timeout");
1195                 goto err_req;
1196         }
1197
1198         ptlrpc_at_add_timed(req);
1199         rc = ptlrpc_hpreq_init(svc, req);
1200         if (rc)
1201                 GOTO(err_req, rc);
1202
1203         /* Move it over to the request processing queue */
1204         rc = ptlrpc_server_request_add(svc, req);
1205         if (rc)
1206                 GOTO(err_req, rc);
1207         cfs_waitq_signal(&svc->srv_waitq);
1208         RETURN(1);
1209
1210 err_req:
1211         spin_lock(&svc->srv_lock);
1212         svc->srv_n_queued_reqs--;
1213         svc->srv_n_active_reqs++;
1214         spin_unlock(&svc->srv_lock);
1215         ptlrpc_server_finish_request(req);
1216
1217         RETURN(1);
1218 }
1219
1220 static int
1221 ptlrpc_server_handle_request(struct ptlrpc_service *svc,
1222                              struct ptlrpc_thread *thread)
1223 {
1224         struct obd_export     *export = NULL;
1225         struct ptlrpc_request *request;
1226         struct timeval         work_start;
1227         struct timeval         work_end;
1228         long                   timediff;
1229         int                    opc, rc;
1230         int                    fail_opc = 0;
1231         ENTRY;
1232
1233         LASSERT(svc);
1234
1235         spin_lock(&svc->srv_lock);
1236         if (!ptlrpc_server_request_pending(svc, 0) ||
1237             (
1238 #ifndef __KERNEL__
1239              /* !@%$# liblustre only has 1 thread */
1240              svc->srv_n_difficult_replies != 0 &&
1241 #endif
1242              svc->srv_n_active_reqs >= (svc->srv_threads_running - 1))) {
1243                 /* Don't handle regular requests in the last thread, in order
1244                  * to handle difficult replies (which might block other threads)
1245                  * as well as handle any incoming reqs, early replies, etc.
1246                  * That means we always need at least 2 service threads. */
1247                 spin_unlock(&svc->srv_lock);
1248                 RETURN(0);
1249         }
1250
1251         request = ptlrpc_server_request_get(svc, 0);
1252         if  (request == NULL) {
1253                 spin_unlock(&svc->srv_lock);
1254                 RETURN(0);
1255         }
1256
1257         opc = lustre_msg_get_opc(request->rq_reqmsg);
1258         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT))
1259                 fail_opc = OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT;
1260         else if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
1261                 fail_opc = OBD_FAIL_PTLRPC_HPREQ_TIMEOUT;
1262
1263         if (unlikely(fail_opc)) {
1264                 if (request->rq_export && request->rq_ops) {
1265                         spin_unlock(&svc->srv_lock);
1266                         OBD_FAIL_TIMEOUT(fail_opc, 4);
1267                         spin_lock(&svc->srv_lock);
1268                         request = ptlrpc_server_request_get(svc, 0);
1269                         if  (request == NULL) {
1270                                 spin_unlock(&svc->srv_lock);
1271                                 RETURN(0);
1272                         }
1273                         LASSERT(ptlrpc_server_request_pending(svc, 0));
1274                 }
1275         }
1276
1277         list_del_init(&request->rq_list);
1278         svc->srv_n_queued_reqs--;
1279         svc->srv_n_active_reqs++;
1280
1281         if (request->rq_hp)
1282                 svc->srv_n_hpreq++;
1283
1284         /* The phase is changed under the lock here because we need to know
1285          * the request is under processing (see ptlrpc_hpreq_reorder()). */
1286         ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
1287         spin_unlock(&svc->srv_lock);
1288
1289         ptlrpc_hpreq_fini(request);
1290
1291         if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG))
1292                 libcfs_debug_dumplog();
1293
1294         do_gettimeofday(&work_start);
1295         timediff = cfs_timeval_sub(&work_start, &request->rq_arrival_time,NULL);
1296         if (svc->srv_stats != NULL) {
1297                 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQWAIT_CNTR,
1298                                     timediff);
1299                 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQQDEPTH_CNTR,
1300                                     svc->srv_n_queued_reqs);
1301                 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQACTIVE_CNTR,
1302                                     svc->srv_n_active_reqs);
1303                 lprocfs_counter_add(svc->srv_stats, PTLRPC_TIMEOUT,
1304                                     at_get(&svc->srv_at_estimate));
1305         }
1306
1307         CDEBUG(D_NET, "got req "LPU64"\n", request->rq_xid);
1308
1309         request->rq_svc_thread = thread;
1310         if (request->rq_export) {
1311                 if (ptlrpc_check_req(request))
1312                         goto put_conn;
1313                 ptlrpc_update_export_timer(request->rq_export, timediff >> 19);
1314                 export = class_export_rpc_get(request->rq_export);
1315         }
1316
1317         /* Discard requests queued for longer than the deadline.
1318            The deadline is increased if we send an early reply. */
1319         if (cfs_time_current_sec() > request->rq_deadline) {
1320                 DEBUG_REQ(D_ERROR, request, "Dropping timed-out request from %s"
1321                           ": deadline %ld%+lds ago\n",
1322                           libcfs_id2str(request->rq_peer),
1323                           request->rq_deadline -
1324                           request->rq_arrival_time.tv_sec,
1325                           cfs_time_current_sec() - request->rq_deadline);
1326                 goto put_rpc_export;
1327         }
1328
1329         CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc "
1330                "%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
1331                (request->rq_export ?
1332                 (char *)request->rq_export->exp_client_uuid.uuid : "0"),
1333                (request->rq_export ?
1334                 atomic_read(&request->rq_export->exp_refcount) : -99),
1335                lustre_msg_get_status(request->rq_reqmsg), request->rq_xid,
1336                libcfs_id2str(request->rq_peer),
1337                lustre_msg_get_opc(request->rq_reqmsg));
1338
1339         OBD_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, obd_fail_val);
1340
1341         rc = svc->srv_handler(request);
1342
1343         ptlrpc_rqphase_move(request, RQ_PHASE_COMPLETE);
1344
1345         CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
1346                "%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
1347                (request->rq_export ?
1348                 (char *)request->rq_export->exp_client_uuid.uuid : "0"),
1349                (request->rq_export ?
1350                 atomic_read(&request->rq_export->exp_refcount) : -99),
1351                lustre_msg_get_status(request->rq_reqmsg), request->rq_xid,
1352                libcfs_id2str(request->rq_peer),
1353                lustre_msg_get_opc(request->rq_reqmsg));
1354
1355 put_rpc_export:
1356         if (export != NULL)
1357                 class_export_rpc_put(export);
1358
1359 put_conn:
1360         if (cfs_time_current_sec() > request->rq_deadline) {
1361                 DEBUG_REQ(D_WARNING, request, "Request x"LPU64" took longer "
1362                           "than estimated (%ld%+lds); client may timeout.",
1363                           request->rq_xid, request->rq_deadline -
1364                           request->rq_arrival_time.tv_sec,
1365                           cfs_time_current_sec() - request->rq_deadline);
1366         }
1367
1368         do_gettimeofday(&work_end);
1369         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
1370         CDEBUG(D_RPCTRACE, "request x"LPU64" opc %u from %s processed in "
1371                "%ldus (%ldus total) trans "LPU64" rc %d/%d\n",
1372                request->rq_xid, lustre_msg_get_opc(request->rq_reqmsg),
1373                libcfs_id2str(request->rq_peer), timediff,
1374                cfs_timeval_sub(&work_end, &request->rq_arrival_time, NULL),
1375                request->rq_repmsg ? lustre_msg_get_transno(request->rq_repmsg) :
1376                request->rq_transno, request->rq_status,
1377                request->rq_repmsg ? lustre_msg_get_status(request->rq_repmsg):
1378                -999);
1379         if (svc->srv_stats != NULL) {
1380                 __u32 op = lustre_msg_get_opc(request->rq_reqmsg);
1381                 int opc = opcode_offset(op);
1382                 if (opc > 0 && !(op == LDLM_ENQUEUE || op == MDS_REINT)) {
1383                         LASSERT(opc < LUSTRE_MAX_OPCODES);
1384                         lprocfs_counter_add(svc->srv_stats,
1385                                             opc + EXTRA_MAX_OPCODES,
1386                                             timediff);
1387                 }
1388         }
1389         if (request->rq_early_count) {
1390                 DEBUG_REQ(D_ADAPTTO, request,
1391                           "sent %d early replies before finishing in %lds",
1392                           request->rq_early_count,
1393                           work_end.tv_sec - request->rq_arrival_time.tv_sec);
1394         }
1395
1396         spin_lock(&svc->srv_lock);
1397         if (request->rq_hp)
1398                 svc->srv_n_hpreq--;
1399         spin_unlock(&svc->srv_lock);
1400         ptlrpc_server_finish_request(request);
1401
1402         RETURN(1);
1403 }
1404
1405 static int
1406 ptlrpc_server_handle_reply (struct ptlrpc_service *svc)
1407 {
1408         struct ptlrpc_reply_state *rs;
1409         struct obd_export         *exp;
1410         struct obd_device         *obd;
1411         int                        nlocks;
1412         int                        been_handled;
1413         ENTRY;
1414
1415         spin_lock(&svc->srv_lock);
1416         if (list_empty (&svc->srv_reply_queue)) {
1417                 spin_unlock(&svc->srv_lock);
1418                 RETURN(0);
1419         }
1420
1421         rs = list_entry (svc->srv_reply_queue.next,
1422                          struct ptlrpc_reply_state, rs_list);
1423
1424         exp = rs->rs_export;
1425         obd = exp->exp_obd;
1426
1427         LASSERT (rs->rs_difficult);
1428         LASSERT (rs->rs_scheduled);
1429
1430         list_del_init (&rs->rs_list);
1431
1432         /* Disengage from notifiers carefully (lock order - irqrestore below!)*/
1433         spin_unlock(&svc->srv_lock);
1434
1435         spin_lock (&exp->exp_uncommitted_replies_lock);
1436         /* Noop if removed already */
1437         list_del_init (&rs->rs_obd_list);
1438         spin_unlock (&exp->exp_uncommitted_replies_lock);
1439
1440         spin_lock (&exp->exp_lock);
1441         /* Noop if removed already */
1442         list_del_init (&rs->rs_exp_list);
1443         spin_unlock (&exp->exp_lock);
1444
1445         spin_lock(&svc->srv_lock);
1446
1447         been_handled = rs->rs_handled;
1448         rs->rs_handled = 1;
1449
1450         nlocks = rs->rs_nlocks;                 /* atomic "steal", but */
1451         rs->rs_nlocks = 0;                      /* locks still on rs_locks! */
1452
1453         if (nlocks == 0 && !been_handled) {
1454                 /* If we see this, we should already have seen the warning
1455                  * in mds_steal_ack_locks()  */
1456                 CWARN("All locks stolen from rs %p x"LPD64".t"LPD64
1457                       " o%d NID %s\n", rs, rs->rs_xid, rs->rs_transno,
1458                       lustre_msg_get_opc(rs->rs_msg),
1459                       libcfs_nid2str(exp->exp_connection->c_peer.nid));
1460         }
1461
1462         if ((!been_handled && rs->rs_on_net) || nlocks > 0) {
1463                 spin_unlock(&svc->srv_lock);
1464
1465                 if (!been_handled && rs->rs_on_net) {
1466                         LNetMDUnlink(rs->rs_md_h);
1467                         /* Ignore return code; we're racing with
1468                          * completion... */
1469                 }
1470
1471                 while (nlocks-- > 0)
1472                         ldlm_lock_decref(&rs->rs_locks[nlocks],
1473                                          rs->rs_modes[nlocks]);
1474
1475                 spin_lock(&svc->srv_lock);
1476         }
1477
1478         rs->rs_scheduled = 0;
1479
1480         if (!rs->rs_on_net) {
1481                 /* Off the net */
1482                 svc->srv_n_difficult_replies--;
1483                 spin_unlock(&svc->srv_lock);
1484
1485                 class_export_put (exp);
1486                 rs->rs_export = NULL;
1487                 ptlrpc_rs_decref (rs);
1488                 atomic_dec (&svc->srv_outstanding_replies);
1489                 RETURN(1);
1490         }
1491
1492         /* still on the net; callback will schedule */
1493         spin_unlock(&svc->srv_lock);
1494         RETURN(1);
1495 }
1496
1497 #ifndef __KERNEL__
1498 /* FIXME make use of timeout later */
1499 int
1500 liblustre_check_services (void *arg)
1501 {
1502         int  did_something = 0;
1503         int  rc;
1504         struct list_head *tmp, *nxt;
1505         ENTRY;
1506
1507         /* I'm relying on being single threaded, not to have to lock
1508          * ptlrpc_all_services etc */
1509         list_for_each_safe (tmp, nxt, &ptlrpc_all_services) {
1510                 struct ptlrpc_service *svc =
1511                         list_entry (tmp, struct ptlrpc_service, srv_list);
1512
1513                 if (svc->srv_threads_running != 0)     /* I've recursed */
1514                         continue;
1515
1516                 /* service threads can block for bulk, so this limits us
1517                  * (arbitrarily) to recursing 1 stack frame per service.
1518                  * Note that the problem with recursion is that we have to
1519                  * unwind completely before our caller can resume. */
1520
1521                 svc->srv_threads_running++;
1522
1523                 do {
1524                         rc = ptlrpc_server_handle_req_in(svc);
1525                         rc |= ptlrpc_server_handle_reply(svc);
1526                         rc |= ptlrpc_at_check_timed(svc);
1527                         rc |= ptlrpc_server_handle_request(svc, NULL);
1528                         rc |= (ptlrpc_server_post_idle_rqbds(svc) > 0);
1529                         did_something |= rc;
1530                 } while (rc);
1531
1532                 svc->srv_threads_running--;
1533         }
1534
1535         RETURN(did_something);
1536 }
1537 #define ptlrpc_stop_all_threads(s) do {} while (0)
1538
1539 #else /* __KERNEL__ */
1540
1541 /* Don't use daemonize, it removes fs struct from new thread (bug 418) */
1542 void ptlrpc_daemonize(char *name)
1543 {
1544         struct fs_struct *fs = current->fs;
1545
1546         atomic_inc(&fs->count);
1547         cfs_daemonize(name);
1548         exit_fs(cfs_current());
1549         current->fs = fs;
1550         ll_set_fs_pwd(current->fs, cfs_fs_mnt(init_task.fs), cfs_fs_pwd(init_task.fs));
1551 }
1552
1553 static void
1554 ptlrpc_check_rqbd_pool(struct ptlrpc_service *svc)
1555 {
1556         int avail = svc->srv_nrqbd_receiving;
1557         int low_water = test_req_buffer_pressure ? 0 :
1558                         svc->srv_nbuf_per_group/2;
1559
1560         /* NB I'm not locking; just looking. */
1561
1562         /* CAVEAT EMPTOR: We might be allocating buffers here because we've
1563          * allowed the request history to grow out of control.  We could put a
1564          * sanity check on that here and cull some history if we need the
1565          * space. */
1566
1567         if (avail <= low_water)
1568                 ptlrpc_grow_req_bufs(svc);
1569
1570         if (svc->srv_stats)
1571                 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQBUF_AVAIL_CNTR,
1572                                     avail);
1573 }
1574
1575 static int
1576 ptlrpc_retry_rqbds(void *arg)
1577 {
1578         struct ptlrpc_service *svc = (struct ptlrpc_service *)arg;
1579
1580         svc->srv_rqbd_timeout = 0;
1581         return (-ETIMEDOUT);
1582 }
1583
1584 static int ptlrpc_main(void *arg)
1585 {
1586         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
1587         struct ptlrpc_service  *svc = data->svc;
1588         struct ptlrpc_thread   *thread = data->thread;
1589         struct obd_device      *dev = data->dev;
1590         struct ptlrpc_reply_state *rs;
1591 #ifdef WITH_GROUP_INFO
1592         struct group_info *ginfo = NULL;
1593 #endif
1594         int counter = 0, rc = 0;
1595         ENTRY;
1596
1597         ptlrpc_daemonize(data->name);
1598
1599 #if defined(HAVE_NODE_TO_CPUMASK) && defined(CONFIG_NUMA)
1600         /* we need to do this before any per-thread allocation is done so that
1601          * we get the per-thread allocations on local node.  bug 7342 */
1602         if (svc->srv_cpu_affinity) {
1603                 int cpu, num_cpu;
1604
1605                 for (cpu = 0, num_cpu = 0; cpu < num_possible_cpus(); cpu++) {
1606                         if (!cpu_online(cpu))
1607                                 continue;
1608                         if (num_cpu == thread->t_id % num_online_cpus())
1609                                 break;
1610                         num_cpu++;
1611                 }
1612                 set_cpus_allowed(cfs_current(), node_to_cpumask(cpu_to_node(cpu)));
1613         }
1614 #endif
1615
1616 #ifdef WITH_GROUP_INFO
1617         ginfo = groups_alloc(0);
1618         if (!ginfo) {
1619                 rc = -ENOMEM;
1620                 goto out;
1621         }
1622
1623         set_current_groups(ginfo);
1624         put_group_info(ginfo);
1625 #endif
1626
1627         if (svc->srv_init != NULL) {
1628                 rc = svc->srv_init(thread);
1629                 if (rc)
1630                         goto out;
1631         }
1632
1633         /* Alloc reply state structure for this one */
1634         OBD_ALLOC_GFP(rs, svc->srv_max_reply_size, CFS_ALLOC_STD);
1635         if (!rs) {
1636                 rc = -ENOMEM;
1637                 goto out_srv_init;
1638         }
1639
1640         /* Record that the thread is running */
1641         thread->t_flags = SVC_RUNNING;
1642         /*
1643          * wake up our creator. Note: @data is invalid after this point,
1644          * because it's allocated on ptlrpc_start_thread() stack.
1645          */
1646         cfs_waitq_signal(&thread->t_ctl_waitq);
1647
1648         thread->t_watchdog = lc_watchdog_add(GET_TIMEOUT(svc), NULL, NULL);
1649
1650         spin_lock(&svc->srv_lock);
1651         svc->srv_threads_running++;
1652         list_add(&rs->rs_list, &svc->srv_free_rs_list);
1653         spin_unlock(&svc->srv_lock);
1654         cfs_waitq_signal(&svc->srv_free_rs_waitq);
1655
1656         CDEBUG(D_NET, "service thread %d (#%d) started\n", thread->t_id,
1657                svc->srv_threads_running);
1658
1659         /* XXX maintain a list of all managed devices: insert here */
1660
1661         while ((thread->t_flags & SVC_STOPPING) == 0 ||
1662                svc->srv_n_difficult_replies != 0) {
1663                 /* Don't exit while there are replies to be handled */
1664                 struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
1665                                                      ptlrpc_retry_rqbds, svc);
1666
1667                 lc_watchdog_disable(thread->t_watchdog);
1668
1669                 cond_resched();
1670
1671                 l_wait_event_exclusive (svc->srv_waitq,
1672                               ((thread->t_flags & SVC_STOPPING) != 0 &&
1673                                svc->srv_n_difficult_replies == 0) ||
1674                               (!list_empty(&svc->srv_idle_rqbds) &&
1675                                svc->srv_rqbd_timeout == 0) ||
1676                               !list_empty(&svc->srv_req_in_queue) ||
1677                               !list_empty(&svc->srv_reply_queue) ||
1678                               (ptlrpc_server_request_pending(svc, 0) &&
1679                                (svc->srv_n_active_reqs <
1680                                 (svc->srv_threads_running - 1))) ||
1681                               svc->srv_at_check,
1682                               &lwi);
1683
1684                 lc_watchdog_touch(thread->t_watchdog, GET_TIMEOUT(svc));
1685
1686                 ptlrpc_check_rqbd_pool(svc);
1687
1688                 if ((svc->srv_threads_started < svc->srv_threads_max) &&
1689                     (svc->srv_n_active_reqs >= (svc->srv_threads_started - 1))){
1690                         /* Ignore return code - we tried... */
1691                         ptlrpc_start_thread(dev, svc);
1692                 }
1693
1694                 if (!list_empty(&svc->srv_reply_queue))
1695                         ptlrpc_server_handle_reply (svc);
1696
1697                 if (!list_empty(&svc->srv_req_in_queue)) {
1698                         /* Process all incoming reqs before handling any */
1699                         ptlrpc_server_handle_req_in(svc);
1700                         /* but limit ourselves in case of flood */
1701                         if (counter++ < 1000)
1702                                 continue;
1703                         counter = 0;
1704                 }
1705
1706                 if (svc->srv_at_check)
1707                         ptlrpc_at_check_timed(svc);
1708
1709                 /* don't handle requests in the last thread */
1710                 if (ptlrpc_server_request_pending(svc, 0) &&
1711                     (svc->srv_n_active_reqs < (svc->srv_threads_running - 1)))
1712                         ptlrpc_server_handle_request(svc, thread);
1713
1714                 if (!list_empty(&svc->srv_idle_rqbds) &&
1715                     ptlrpc_server_post_idle_rqbds(svc) < 0) {
1716                         /* I just failed to repost request buffers.  Wait
1717                          * for a timeout (unless something else happens)
1718                          * before I try again */
1719                         svc->srv_rqbd_timeout = cfs_time_seconds(1)/10;
1720                         CDEBUG(D_RPCTRACE,"Posted buffers: %d\n",
1721                                svc->srv_nrqbd_receiving);
1722                 }
1723         }
1724
1725         lc_watchdog_delete(thread->t_watchdog);
1726         thread->t_watchdog = NULL;
1727
1728 out_srv_init:
1729         /*
1730          * deconstruct service specific state created by ptlrpc_start_thread()
1731          */
1732         if (svc->srv_done != NULL)
1733                 svc->srv_done(thread);
1734
1735 out:
1736         CDEBUG(D_NET, "service thread %d exiting: rc %d\n", thread->t_id, rc);
1737
1738         spin_lock(&svc->srv_lock);
1739         svc->srv_threads_running--;              /* must know immediately */
1740         thread->t_id = rc;
1741         thread->t_flags = SVC_STOPPED;
1742
1743         cfs_waitq_signal(&thread->t_ctl_waitq);
1744         spin_unlock(&svc->srv_lock);
1745
1746         return rc;
1747 }
1748
1749 static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
1750                                struct ptlrpc_thread *thread)
1751 {
1752         struct l_wait_info lwi = { 0 };
1753
1754         spin_lock(&svc->srv_lock);
1755         thread->t_flags = SVC_STOPPING;
1756         spin_unlock(&svc->srv_lock);
1757
1758         cfs_waitq_broadcast(&svc->srv_waitq);
1759         l_wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED),
1760                      &lwi);
1761
1762         spin_lock(&svc->srv_lock);
1763         list_del(&thread->t_link);
1764         spin_unlock(&svc->srv_lock);
1765
1766         OBD_FREE(thread, sizeof(*thread));
1767 }
1768
1769 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
1770 {
1771         struct ptlrpc_thread *thread;
1772
1773         spin_lock(&svc->srv_lock);
1774         while (!list_empty(&svc->srv_threads)) {
1775                 thread = list_entry(svc->srv_threads.next,
1776                                     struct ptlrpc_thread, t_link);
1777
1778                 spin_unlock(&svc->srv_lock);
1779                 ptlrpc_stop_thread(svc, thread);
1780                 spin_lock(&svc->srv_lock);
1781         }
1782
1783         spin_unlock(&svc->srv_lock);
1784 }
1785
1786 int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
1787 {
1788         int i, rc = 0;
1789         ENTRY;
1790
1791         /* We require 2 threads min - see note in
1792          * ptlrpc_server_handle_request() */
1793
1794         LASSERT(svc->srv_threads_min >= 2);
1795         for (i = 0; i < svc->srv_threads_min; i++) {
1796                 rc = ptlrpc_start_thread(dev, svc);
1797                 /* We have enough threads, don't start more.  b=15759 */
1798                 if (rc == -EMFILE)
1799                         break;
1800                 if (rc) {
1801                         CERROR("cannot start %s thread #%d: rc %d\n",
1802                                svc->srv_thread_name, i, rc);
1803                         ptlrpc_stop_all_threads(svc);
1804                 }
1805         }
1806         RETURN(rc);
1807 }
1808
1809 int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
1810 {
1811         struct l_wait_info lwi = { 0 };
1812         struct ptlrpc_svc_data d;
1813         struct ptlrpc_thread *thread;
1814         char name[32];
1815         int id, rc;
1816         ENTRY;
1817
1818         CDEBUG(D_RPCTRACE, "%s started %d min %d max %d running %d\n",
1819                svc->srv_name, svc->srv_threads_started, svc->srv_threads_min,
1820                svc->srv_threads_max, svc->srv_threads_running);
1821         if (unlikely(svc->srv_threads_started >= svc->srv_threads_max) ||
1822             (OBD_FAIL_CHECK(OBD_FAIL_TGT_TOOMANY_THREADS) &&
1823              svc->srv_threads_started == svc->srv_threads_min - 1))
1824                 RETURN(-EMFILE);
1825
1826         OBD_ALLOC(thread, sizeof(*thread));
1827         if (thread == NULL)
1828                 RETURN(-ENOMEM);
1829         cfs_waitq_init(&thread->t_ctl_waitq);
1830
1831         spin_lock(&svc->srv_lock);
1832         if (svc->srv_threads_started >= svc->srv_threads_max) {
1833                 spin_unlock(&svc->srv_lock);
1834                 OBD_FREE(thread, sizeof(*thread));
1835                 RETURN(-EMFILE);
1836         }
1837         list_add(&thread->t_link, &svc->srv_threads);
1838         id = svc->srv_threads_started++;
1839         spin_unlock(&svc->srv_lock);
1840
1841         thread->t_svc = svc;
1842         thread->t_id = id;
1843         sprintf(name, "%s_%02d", svc->srv_thread_name, id);
1844         d.dev = dev;
1845         d.svc = svc;
1846         d.name = name;
1847         d.thread = thread;
1848
1849         CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name);
1850
1851         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
1852          * just drop the VM and FILES in ptlrpc_daemonize() right away.
1853          */
1854         rc = cfs_kernel_thread(ptlrpc_main, &d, CLONE_VM | CLONE_FILES);
1855         if (rc < 0) {
1856                 CERROR("cannot start thread '%s': rc %d\n", name, rc);
1857
1858                 spin_lock(&svc->srv_lock);
1859                 list_del(&thread->t_link);
1860                 --svc->srv_threads_started;
1861                 spin_unlock(&svc->srv_lock);
1862
1863                 OBD_FREE(thread, sizeof(*thread));
1864                 RETURN(rc);
1865         }
1866         l_wait_event(thread->t_ctl_waitq,
1867                      thread->t_flags & (SVC_RUNNING | SVC_STOPPED), &lwi);
1868
1869         rc = (thread->t_flags & SVC_STOPPED) ? thread->t_id : 0;
1870         RETURN(rc);
1871 }
1872 #endif
1873
1874 int ptlrpc_unregister_service(struct ptlrpc_service *service)
1875 {
1876         int                   rc;
1877         struct l_wait_info    lwi;
1878         struct list_head     *tmp;
1879         struct ptlrpc_reply_state *rs, *t;
1880         struct ptlrpc_at_array *array = &service->srv_at_array;
1881
1882         cfs_timer_disarm(&service->srv_at_timer);
1883
1884         ptlrpc_stop_all_threads(service);
1885         LASSERT(list_empty(&service->srv_threads));
1886
1887         spin_lock (&ptlrpc_all_services_lock);
1888         list_del_init (&service->srv_list);
1889         spin_unlock (&ptlrpc_all_services_lock);
1890
1891         ptlrpc_lprocfs_unregister_service(service);
1892
1893         /* All history will be culled when the next request buffer is
1894          * freed */
1895         service->srv_max_history_rqbds = 0;
1896
1897         CDEBUG(D_NET, "%s: tearing down\n", service->srv_name);
1898
1899         rc = LNetClearLazyPortal(service->srv_req_portal);
1900         LASSERT (rc == 0);
1901
1902         /* Unlink all the request buffers.  This forces a 'final' event with
1903          * its 'unlink' flag set for each posted rqbd */
1904         list_for_each(tmp, &service->srv_active_rqbds) {
1905                 struct ptlrpc_request_buffer_desc *rqbd =
1906                         list_entry(tmp, struct ptlrpc_request_buffer_desc,
1907                                    rqbd_list);
1908
1909                 rc = LNetMDUnlink(rqbd->rqbd_md_h);
1910                 LASSERT (rc == 0 || rc == -ENOENT);
1911         }
1912
1913         /* Wait for the network to release any buffers it's currently
1914          * filling */
1915         for (;;) {
1916                 spin_lock(&service->srv_lock);
1917                 rc = service->srv_nrqbd_receiving;
1918                 spin_unlock(&service->srv_lock);
1919
1920                 if (rc == 0)
1921                         break;
1922
1923                 /* Network access will complete in finite time but the HUGE
1924                  * timeout lets us CWARN for visibility of sluggish NALs */
1925                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
1926                                            cfs_time_seconds(1), NULL, NULL);
1927                 rc = l_wait_event(service->srv_waitq,
1928                                   service->srv_nrqbd_receiving == 0,
1929                                   &lwi);
1930                 if (rc == -ETIMEDOUT)
1931                         CWARN("Service %s waiting for request buffers\n",
1932                               service->srv_name);
1933         }
1934
1935         /* schedule all outstanding replies to terminate them */
1936         spin_lock(&service->srv_lock);
1937         while (!list_empty(&service->srv_active_replies)) {
1938                 struct ptlrpc_reply_state *rs =
1939                         list_entry(service->srv_active_replies.next,
1940                                    struct ptlrpc_reply_state, rs_list);
1941                 ptlrpc_schedule_difficult_reply(rs);
1942         }
1943         spin_unlock(&service->srv_lock);
1944
1945         /* purge the request queue.  NB No new replies (rqbds all unlinked)
1946          * and no service threads, so I'm the only thread noodling the
1947          * request queue now */
1948         while (!list_empty(&service->srv_req_in_queue)) {
1949                 struct ptlrpc_request *req =
1950                         list_entry(service->srv_req_in_queue.next,
1951                                    struct ptlrpc_request,
1952                                    rq_list);
1953
1954                 list_del(&req->rq_list);
1955                 service->srv_n_queued_reqs--;
1956                 service->srv_n_active_reqs++;
1957                 ptlrpc_server_finish_request(req);
1958         }
1959         while (ptlrpc_server_request_pending(service, 1)) {
1960                 struct ptlrpc_request *req;
1961
1962                 req = ptlrpc_server_request_get(service, 1);
1963                 list_del(&req->rq_list);
1964                 service->srv_n_queued_reqs--;
1965                 service->srv_n_active_reqs++;
1966                 ptlrpc_hpreq_fini(req);
1967                 ptlrpc_server_finish_request(req);
1968         }
1969         LASSERT(service->srv_n_queued_reqs == 0);
1970         LASSERT(service->srv_n_active_reqs == 0);
1971         LASSERT(service->srv_n_history_rqbds == 0);
1972         LASSERT(list_empty(&service->srv_active_rqbds));
1973
1974         /* Now free all the request buffers since nothing references them
1975          * any more... */
1976         while (!list_empty(&service->srv_idle_rqbds)) {
1977                 struct ptlrpc_request_buffer_desc *rqbd =
1978                         list_entry(service->srv_idle_rqbds.next,
1979                                    struct ptlrpc_request_buffer_desc,
1980                                    rqbd_list);
1981
1982                 ptlrpc_free_rqbd(rqbd);
1983         }
1984
1985         /* wait for all outstanding replies to complete (they were
1986          * scheduled having been flagged to abort above) */
1987         while (atomic_read(&service->srv_outstanding_replies) != 0) {
1988                 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(10), NULL, NULL);
1989
1990                 rc = l_wait_event(service->srv_waitq,
1991                                   !list_empty(&service->srv_reply_queue), &lwi);
1992                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
1993
1994                 if (rc == 0) {
1995                         ptlrpc_server_handle_reply(service);
1996                         continue;
1997                 }
1998                 CWARN("Unexpectedly long timeout %p\n", service);
1999         }
2000
2001         list_for_each_entry_safe(rs, t, &service->srv_free_rs_list, rs_list) {
2002                 list_del(&rs->rs_list);
2003                 OBD_FREE(rs, service->srv_max_reply_size);
2004         }
2005
2006         /* In case somebody rearmed this in the meantime */
2007         cfs_timer_disarm(&service->srv_at_timer);
2008
2009         if (array->paa_reqs_array != NULL) {
2010                 OBD_FREE(array->paa_reqs_array,
2011                          sizeof(struct list_head) * array->paa_size);
2012                 array->paa_reqs_array = NULL;
2013         }
2014
2015         if (array->paa_reqs_count != NULL) {
2016                 OBD_FREE(array->paa_reqs_count,
2017                          sizeof(__u32) * array->paa_size);
2018                 array->paa_reqs_count= NULL;
2019         }
2020
2021         OBD_FREE(service, sizeof(*service));
2022         return 0;
2023 }
2024
2025 /* Returns 0 if the service is healthy.
2026  *
2027  * Right now, it just checks to make sure that requests aren't languishing
2028  * in the queue.  We'll use this health check to govern whether a node needs
2029  * to be shot, so it's intentionally non-aggressive. */
2030 int ptlrpc_service_health_check(struct ptlrpc_service *svc)
2031 {
2032         struct ptlrpc_request *request;
2033         struct timeval         right_now;
2034         long                   timediff;
2035
2036         if (svc == NULL)
2037                 return 0;
2038
2039         do_gettimeofday(&right_now);
2040
2041         spin_lock(&svc->srv_lock);
2042         if (!ptlrpc_server_request_pending(svc, 1)) {
2043                 spin_unlock(&svc->srv_lock);
2044                 return 0;
2045         }
2046
2047         /* How long has the next entry been waiting? */
2048         if (list_empty(&svc->srv_request_queue))
2049                 request = list_entry(svc->srv_request_hpq.next,
2050                                      struct ptlrpc_request, rq_list);
2051         else
2052                 request = list_entry(svc->srv_request_queue.next,
2053                                      struct ptlrpc_request, rq_list);
2054         timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
2055         spin_unlock(&svc->srv_lock);
2056
2057         if ((timediff / ONE_MILLION) > (AT_OFF ? obd_timeout * 3/2 :
2058                                         at_max)) {
2059                 CERROR("%s: unhealthy - request has been waiting %lds\n",
2060                        svc->srv_name, timediff / ONE_MILLION);
2061                 return (-1);
2062         }
2063
2064         return 0;
2065 }