Whamcloud - gitweb
LU-709 build: clean up libcfs/autoconf/lustre-libcfs.m4
[fs/lustre-release.git] / lustre / ptlrpc / service.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, 2012, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  */
38
39 #define DEBUG_SUBSYSTEM S_RPC
40 #ifndef __KERNEL__
41 #include <liblustre.h>
42 #endif
43 #include <obd_support.h>
44 #include <obd_class.h>
45 #include <lustre_net.h>
46 #include <lu_object.h>
47 #include <lnet/types.h>
48 #include "ptlrpc_internal.h"
49
50 /* The following are visible and mutable through /sys/module/ptlrpc */
51 int test_req_buffer_pressure = 0;
52 CFS_MODULE_PARM(test_req_buffer_pressure, "i", int, 0444,
53                 "set non-zero to put pressure on request buffer pools");
54 CFS_MODULE_PARM(at_min, "i", int, 0644,
55                 "Adaptive timeout minimum (sec)");
56 CFS_MODULE_PARM(at_max, "i", int, 0644,
57                 "Adaptive timeout maximum (sec)");
58 CFS_MODULE_PARM(at_history, "i", int, 0644,
59                 "Adaptive timeouts remember the slowest event that took place "
60                 "within this period (sec)");
61 CFS_MODULE_PARM(at_early_margin, "i", int, 0644,
62                 "How soon before an RPC deadline to send an early reply");
63 CFS_MODULE_PARM(at_extra, "i", int, 0644,
64                 "How much extra time to give with each early reply");
65
66
67 /* forward ref */
68 static int ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc);
69 static void ptlrpc_hpreq_fini(struct ptlrpc_request *req);
70
71 static CFS_LIST_HEAD(ptlrpc_all_services);
72 cfs_spinlock_t ptlrpc_all_services_lock;
73
74 struct ptlrpc_request_buffer_desc *
75 ptlrpc_alloc_rqbd (struct ptlrpc_service *svc)
76 {
77         struct ptlrpc_request_buffer_desc *rqbd;
78
79         OBD_ALLOC_PTR(rqbd);
80         if (rqbd == NULL)
81                 return (NULL);
82
83         rqbd->rqbd_service = svc;
84         rqbd->rqbd_refcount = 0;
85         rqbd->rqbd_cbid.cbid_fn = request_in_callback;
86         rqbd->rqbd_cbid.cbid_arg = rqbd;
87         CFS_INIT_LIST_HEAD(&rqbd->rqbd_reqs);
88         OBD_ALLOC_LARGE(rqbd->rqbd_buffer, svc->srv_buf_size);
89
90         if (rqbd->rqbd_buffer == NULL) {
91                 OBD_FREE_PTR(rqbd);
92                 return (NULL);
93         }
94
95         cfs_spin_lock(&svc->srv_lock);
96         cfs_list_add(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
97         svc->srv_nbufs++;
98         cfs_spin_unlock(&svc->srv_lock);
99
100         return (rqbd);
101 }
102
103 void
104 ptlrpc_free_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
105 {
106         struct ptlrpc_service *svc = rqbd->rqbd_service;
107
108         LASSERT (rqbd->rqbd_refcount == 0);
109         LASSERT (cfs_list_empty(&rqbd->rqbd_reqs));
110
111         cfs_spin_lock(&svc->srv_lock);
112         cfs_list_del(&rqbd->rqbd_list);
113         svc->srv_nbufs--;
114         cfs_spin_unlock(&svc->srv_lock);
115
116         OBD_FREE_LARGE(rqbd->rqbd_buffer, svc->srv_buf_size);
117         OBD_FREE_PTR(rqbd);
118 }
119
120 int
121 ptlrpc_grow_req_bufs(struct ptlrpc_service *svc)
122 {
123         struct ptlrpc_request_buffer_desc *rqbd;
124         int                                rc = 0;
125         int                                i;
126
127         for (i = 0; i < svc->srv_nbuf_per_group; i++) {
128                 /* NB: another thread might be doing this as well, we need to
129                  * make sure that it wouldn't over-allocate, see LU-1212. */
130                 if (svc->srv_nrqbd_receiving >= svc->srv_nbuf_per_group)
131                         break;
132
133                 rqbd = ptlrpc_alloc_rqbd(svc);
134
135                 if (rqbd == NULL) {
136                         CERROR("%s: Can't allocate request buffer\n",
137                                svc->srv_name);
138                         rc = -ENOMEM;
139                         break;
140                 }
141
142                 if (ptlrpc_server_post_idle_rqbds(svc) < 0) {
143                         rc = -EAGAIN;
144                         break;
145                 }
146         }
147
148         CDEBUG(D_RPCTRACE,
149                "%s: allocate %d new %d-byte reqbufs (%d/%d left), rc = %d\n",
150                svc->srv_name, i, svc->srv_buf_size,
151                svc->srv_nrqbd_receiving, svc->srv_nbufs, rc);
152
153         return rc;
154 }
155
156 /**
157  * Part of Rep-Ack logic.
158  * Puts a lock and its mode into reply state assotiated to request reply.
159  */
160 void
161 ptlrpc_save_lock(struct ptlrpc_request *req,
162                  struct lustre_handle *lock, int mode, int no_ack)
163 {
164         struct ptlrpc_reply_state *rs = req->rq_reply_state;
165         int                        idx;
166
167         LASSERT(rs != NULL);
168         LASSERT(rs->rs_nlocks < RS_MAX_LOCKS);
169
170         if (req->rq_export->exp_disconnected) {
171                 ldlm_lock_decref(lock, mode);
172         } else {
173                 idx = rs->rs_nlocks++;
174                 rs->rs_locks[idx] = *lock;
175                 rs->rs_modes[idx] = mode;
176                 rs->rs_difficult = 1;
177                 rs->rs_no_ack = !!no_ack;
178         }
179 }
180
181 #ifdef __KERNEL__
182
183 #define HRT_RUNNING 0
184 #define HRT_STOPPING 1
185
186 struct ptlrpc_hr_thread {
187         cfs_spinlock_t        hrt_lock;
188         unsigned long         hrt_flags;
189         cfs_waitq_t           hrt_wait;
190         cfs_list_t            hrt_queue;
191         cfs_completion_t      hrt_completion;
192 };
193
194 struct ptlrpc_hr_service {
195         int                     hr_index;
196         int                     hr_n_threads;
197         int                     hr_size;
198         struct ptlrpc_hr_thread hr_threads[0];
199 };
200
201 struct rs_batch {
202         cfs_list_t              rsb_replies;
203         struct ptlrpc_service  *rsb_svc;
204         unsigned int            rsb_n_replies;
205 };
206
207 /**
208  *  A pointer to per-node reply handling service.
209  */
210 static struct ptlrpc_hr_service *ptlrpc_hr = NULL;
211
212 /**
213  * maximum mumber of replies scheduled in one batch
214  */
215 #define MAX_SCHEDULED 256
216
217 /**
218  * Initialize a reply batch.
219  *
220  * \param b batch
221  */
222 static void rs_batch_init(struct rs_batch *b)
223 {
224         memset(b, 0, sizeof *b);
225         CFS_INIT_LIST_HEAD(&b->rsb_replies);
226 }
227
228 /**
229  * Choose an hr thread to dispatch requests to.
230  */
231 static unsigned int get_hr_thread_index(struct ptlrpc_hr_service *hr)
232 {
233         unsigned int idx;
234
235         /* Concurrent modification of hr_index w/o any spinlock
236            protection is harmless as long as the result fits
237            [0..(hr_n_threads-1)] range and each thread gets near equal
238            load. */
239         idx = hr->hr_index;
240         hr->hr_index = (idx >= hr->hr_n_threads - 1) ? 0 : idx + 1;
241         return idx;
242 }
243
244 /**
245  * Dispatch all replies accumulated in the batch to one from
246  * dedicated reply handling threads.
247  *
248  * \param b batch
249  */
250 static void rs_batch_dispatch(struct rs_batch *b)
251 {
252         if (b->rsb_n_replies != 0) {
253                 struct ptlrpc_hr_service *hr = ptlrpc_hr;
254                 int idx;
255
256                 idx = get_hr_thread_index(hr);
257
258                 cfs_spin_lock(&hr->hr_threads[idx].hrt_lock);
259                 cfs_list_splice_init(&b->rsb_replies,
260                                      &hr->hr_threads[idx].hrt_queue);
261                 cfs_spin_unlock(&hr->hr_threads[idx].hrt_lock);
262                 cfs_waitq_signal(&hr->hr_threads[idx].hrt_wait);
263                 b->rsb_n_replies = 0;
264         }
265 }
266
267 /**
268  * Add a reply to a batch.
269  * Add one reply object to a batch, schedule batched replies if overload.
270  *
271  * \param b batch
272  * \param rs reply
273  */
274 static void rs_batch_add(struct rs_batch *b, struct ptlrpc_reply_state *rs)
275 {
276         struct ptlrpc_service *svc = rs->rs_service;
277
278         if (svc != b->rsb_svc || b->rsb_n_replies >= MAX_SCHEDULED) {
279                 if (b->rsb_svc != NULL) {
280                         rs_batch_dispatch(b);
281                         cfs_spin_unlock(&b->rsb_svc->srv_rs_lock);
282                 }
283                 cfs_spin_lock(&svc->srv_rs_lock);
284                 b->rsb_svc = svc;
285         }
286         cfs_spin_lock(&rs->rs_lock);
287         rs->rs_scheduled_ever = 1;
288         if (rs->rs_scheduled == 0) {
289                 cfs_list_move(&rs->rs_list, &b->rsb_replies);
290                 rs->rs_scheduled = 1;
291                 b->rsb_n_replies++;
292         }
293         rs->rs_committed = 1;
294         cfs_spin_unlock(&rs->rs_lock);
295 }
296
297 /**
298  * Reply batch finalization.
299  * Dispatch remaining replies from the batch
300  * and release remaining spinlock.
301  *
302  * \param b batch
303  */
304 static void rs_batch_fini(struct rs_batch *b)
305 {
306         if (b->rsb_svc != 0) {
307                 rs_batch_dispatch(b);
308                 cfs_spin_unlock(&b->rsb_svc->srv_rs_lock);
309         }
310 }
311
312 #define DECLARE_RS_BATCH(b)     struct rs_batch b
313
314 #else /* __KERNEL__ */
315
316 #define rs_batch_init(b)        do{}while(0)
317 #define rs_batch_fini(b)        do{}while(0)
318 #define rs_batch_add(b, r)      ptlrpc_schedule_difficult_reply(r)
319 #define DECLARE_RS_BATCH(b)
320
321 #endif /* __KERNEL__ */
322
323 /**
324  * Put reply state into a queue for processing because we received
325  * ACK from the client
326  */
327 void ptlrpc_dispatch_difficult_reply(struct ptlrpc_reply_state *rs)
328 {
329 #ifdef __KERNEL__
330         struct ptlrpc_hr_service *hr = ptlrpc_hr;
331         int idx;
332         ENTRY;
333
334         LASSERT(cfs_list_empty(&rs->rs_list));
335
336         idx = get_hr_thread_index(hr);
337         cfs_spin_lock(&hr->hr_threads[idx].hrt_lock);
338         cfs_list_add_tail(&rs->rs_list, &hr->hr_threads[idx].hrt_queue);
339         cfs_spin_unlock(&hr->hr_threads[idx].hrt_lock);
340         cfs_waitq_signal(&hr->hr_threads[idx].hrt_wait);
341         EXIT;
342 #else
343         cfs_list_add_tail(&rs->rs_list, &rs->rs_service->srv_reply_queue);
344 #endif
345 }
346
347 void
348 ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
349 {
350         ENTRY;
351
352         LASSERT_SPIN_LOCKED(&rs->rs_service->srv_rs_lock);
353         LASSERT_SPIN_LOCKED(&rs->rs_lock);
354         LASSERT (rs->rs_difficult);
355         rs->rs_scheduled_ever = 1;  /* flag any notification attempt */
356
357         if (rs->rs_scheduled) {     /* being set up or already notified */
358                 EXIT;
359                 return;
360         }
361
362         rs->rs_scheduled = 1;
363         cfs_list_del_init(&rs->rs_list);
364         ptlrpc_dispatch_difficult_reply(rs);
365         EXIT;
366 }
367
368 void ptlrpc_commit_replies(struct obd_export *exp)
369 {
370         struct ptlrpc_reply_state *rs, *nxt;
371         DECLARE_RS_BATCH(batch);
372         ENTRY;
373
374         rs_batch_init(&batch);
375         /* Find any replies that have been committed and get their service
376          * to attend to complete them. */
377
378         /* CAVEAT EMPTOR: spinlock ordering!!! */
379         cfs_spin_lock(&exp->exp_uncommitted_replies_lock);
380         cfs_list_for_each_entry_safe(rs, nxt, &exp->exp_uncommitted_replies,
381                                      rs_obd_list) {
382                 LASSERT (rs->rs_difficult);
383                 /* VBR: per-export last_committed */
384                 LASSERT(rs->rs_export);
385                 if (rs->rs_transno <= exp->exp_last_committed) {
386                         cfs_list_del_init(&rs->rs_obd_list);
387                         rs_batch_add(&batch, rs);
388                 }
389         }
390         cfs_spin_unlock(&exp->exp_uncommitted_replies_lock);
391         rs_batch_fini(&batch);
392         EXIT;
393 }
394
395 static int
396 ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc)
397 {
398         struct ptlrpc_request_buffer_desc *rqbd;
399         int                                rc;
400         int                                posted = 0;
401
402         for (;;) {
403                 cfs_spin_lock(&svc->srv_lock);
404
405                 if (cfs_list_empty (&svc->srv_idle_rqbds)) {
406                         cfs_spin_unlock(&svc->srv_lock);
407                         return (posted);
408                 }
409
410                 rqbd = cfs_list_entry(svc->srv_idle_rqbds.next,
411                                       struct ptlrpc_request_buffer_desc,
412                                       rqbd_list);
413                 cfs_list_del (&rqbd->rqbd_list);
414
415                 /* assume we will post successfully */
416                 svc->srv_nrqbd_receiving++;
417                 cfs_list_add (&rqbd->rqbd_list, &svc->srv_active_rqbds);
418
419                 cfs_spin_unlock(&svc->srv_lock);
420
421                 rc = ptlrpc_register_rqbd(rqbd);
422                 if (rc != 0)
423                         break;
424
425                 posted = 1;
426         }
427
428         cfs_spin_lock(&svc->srv_lock);
429
430         svc->srv_nrqbd_receiving--;
431         cfs_list_del(&rqbd->rqbd_list);
432         cfs_list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
433
434         /* Don't complain if no request buffers are posted right now; LNET
435          * won't drop requests because we set the portal lazy! */
436
437         cfs_spin_unlock(&svc->srv_lock);
438
439         return (-1);
440 }
441
442 /**
443  * Start a service with parameters from struct ptlrpc_service_conf \a c
444  * as opposed to directly calling ptlrpc_init_svc with tons of arguments.
445  */
446 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
447                                             svc_handler_t h, char *name,
448                                             struct proc_dir_entry *proc_entry,
449                                             svc_req_printfn_t prntfn,
450                                             char *threadname)
451 {
452         return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
453                                c->psc_max_req_size, c->psc_max_reply_size,
454                                c->psc_req_portal, c->psc_rep_portal,
455                                c->psc_watchdog_factor,
456                                h, name, proc_entry,
457                                prntfn, c->psc_min_threads, c->psc_max_threads,
458                                threadname, c->psc_ctx_tags, NULL);
459 }
460 EXPORT_SYMBOL(ptlrpc_init_svc_conf);
461
462 static void ptlrpc_at_timer(unsigned long castmeharder)
463 {
464         struct ptlrpc_service *svc = (struct ptlrpc_service *)castmeharder;
465         svc->srv_at_check = 1;
466         svc->srv_at_checktime = cfs_time_current();
467         cfs_waitq_signal(&svc->srv_waitq);
468 }
469
470 /**
471  * Initialize service on a given portal.
472  * This includes starting serving threads , allocating and posting rqbds and
473  * so on.
474  * \a nbufs is how many buffers to post
475  * \a bufsize is buffer size to post
476  * \a max_req_size - maximum request size to be accepted for this service
477  * \a max_reply_size maximum reply size this service can ever send
478  * \a req_portal - portal to listed for requests on
479  * \a rep_portal - portal of where to send replies to
480  * \a watchdog_factor soft watchdog timeout multiplifier to print stuck service traces.
481  * \a handler - function to process every new request
482  * \a name - service name
483  * \a proc_entry - entry in the /proc tree for sttistics reporting
484  * \a min_threads \a max_threads - min/max number of service threads to start.
485  * \a threadname should be 11 characters or less - 3 will be added on
486  * \a hp_handler - function to determine priority of the request, also called
487  *                 on every new request.
488  */
489 struct ptlrpc_service *
490 ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
491                 int req_portal, int rep_portal, int watchdog_factor,
492                 svc_handler_t handler, char *name,
493                 cfs_proc_dir_entry_t *proc_entry,
494                 svc_req_printfn_t svcreq_printfn,
495                 int min_threads, int max_threads,
496                 char *threadname, __u32 ctx_tags,
497                 svc_hpreq_handler_t hp_handler)
498 {
499         int                     rc;
500         struct ptlrpc_at_array *array;
501         struct ptlrpc_service  *service;
502         unsigned int            size, index;
503         ENTRY;
504
505         LASSERT (nbufs > 0);
506         LASSERT (bufsize >= max_req_size + SPTLRPC_MAX_PAYLOAD);
507         LASSERT (ctx_tags != 0);
508
509         OBD_ALLOC_PTR(service);
510         if (service == NULL)
511                 RETURN(NULL);
512
513         /* First initialise enough for early teardown */
514
515         service->srv_name = name;
516         cfs_spin_lock_init(&service->srv_lock);
517         cfs_spin_lock_init(&service->srv_rq_lock);
518         cfs_spin_lock_init(&service->srv_rs_lock);
519         CFS_INIT_LIST_HEAD(&service->srv_threads);
520         cfs_waitq_init(&service->srv_waitq);
521
522         service->srv_nbuf_per_group = test_req_buffer_pressure ? 1 : nbufs;
523         service->srv_max_req_size = max_req_size + SPTLRPC_MAX_PAYLOAD;
524         service->srv_buf_size = bufsize;
525         service->srv_rep_portal = rep_portal;
526         service->srv_req_portal = req_portal;
527         service->srv_watchdog_factor = watchdog_factor;
528         service->srv_handler = handler;
529         service->srv_req_printfn = svcreq_printfn;
530         service->srv_request_seq = 1;           /* valid seq #s start at 1 */
531         service->srv_request_max_cull_seq = 0;
532         service->srv_threads_min = min_threads;
533         service->srv_threads_max = max_threads;
534         service->srv_thread_name = threadname;
535         service->srv_ctx_tags = ctx_tags;
536         service->srv_hpreq_handler = hp_handler;
537         service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
538         service->srv_hpreq_count = 0;
539         service->srv_n_active_hpreq = 0;
540
541         rc = LNetSetLazyPortal(service->srv_req_portal);
542         LASSERT (rc == 0);
543
544         CFS_INIT_LIST_HEAD(&service->srv_request_queue);
545         CFS_INIT_LIST_HEAD(&service->srv_request_hpq);
546         CFS_INIT_LIST_HEAD(&service->srv_idle_rqbds);
547         CFS_INIT_LIST_HEAD(&service->srv_active_rqbds);
548         CFS_INIT_LIST_HEAD(&service->srv_history_rqbds);
549         CFS_INIT_LIST_HEAD(&service->srv_request_history);
550         CFS_INIT_LIST_HEAD(&service->srv_active_replies);
551 #ifndef __KERNEL__
552         CFS_INIT_LIST_HEAD(&service->srv_reply_queue);
553 #endif
554         CFS_INIT_LIST_HEAD(&service->srv_free_rs_list);
555         cfs_waitq_init(&service->srv_free_rs_waitq);
556         cfs_atomic_set(&service->srv_n_difficult_replies, 0);
557
558         cfs_spin_lock_init(&service->srv_at_lock);
559         CFS_INIT_LIST_HEAD(&service->srv_req_in_queue);
560
561         array = &service->srv_at_array;
562         size = at_est2timeout(at_max);
563         array->paa_size = size;
564         array->paa_count = 0;
565         array->paa_deadline = -1;
566
567         /* allocate memory for srv_at_array (ptlrpc_at_array) */
568         OBD_ALLOC(array->paa_reqs_array, sizeof(cfs_list_t) * size);
569         if (array->paa_reqs_array == NULL)
570                 GOTO(failed, NULL);
571
572         for (index = 0; index < size; index++)
573                 CFS_INIT_LIST_HEAD(&array->paa_reqs_array[index]);
574
575         OBD_ALLOC(array->paa_reqs_count, sizeof(__u32) * size);
576         if (array->paa_reqs_count == NULL)
577                 GOTO(failed, NULL);
578
579         cfs_timer_init(&service->srv_at_timer, ptlrpc_at_timer, service);
580         /* At SOW, service time should be quick; 10s seems generous. If client
581            timeout is less than this, we'll be sending an early reply. */
582         at_init(&service->srv_at_estimate, 10, 0);
583
584         cfs_spin_lock (&ptlrpc_all_services_lock);
585         cfs_list_add (&service->srv_list, &ptlrpc_all_services);
586         cfs_spin_unlock (&ptlrpc_all_services_lock);
587
588         /* Now allocate the request buffers */
589         rc = ptlrpc_grow_req_bufs(service);
590         /* We shouldn't be under memory pressure at startup, so
591          * fail if we can't post all our buffers at this time. */
592         if (rc != 0)
593                 GOTO(failed, NULL);
594
595         /* Now allocate pool of reply buffers */
596         /* Increase max reply size to next power of two */
597         service->srv_max_reply_size = 1;
598         while (service->srv_max_reply_size <
599                max_reply_size + SPTLRPC_MAX_PAYLOAD)
600                 service->srv_max_reply_size <<= 1;
601
602         if (proc_entry != NULL)
603                 ptlrpc_lprocfs_register_service(proc_entry, service);
604
605         CDEBUG(D_NET, "%s: Started, listening on portal %d\n",
606                service->srv_name, service->srv_req_portal);
607
608         RETURN(service);
609 failed:
610         ptlrpc_unregister_service(service);
611         return NULL;
612 }
613
614 /**
615  * to actually free the request, must be called without holding svc_lock.
616  * note it's caller's responsibility to unlink req->rq_list.
617  */
618 static void ptlrpc_server_free_request(struct ptlrpc_request *req)
619 {
620         LASSERT(cfs_atomic_read(&req->rq_refcount) == 0);
621         LASSERT(cfs_list_empty(&req->rq_timed_list));
622
623          /* DEBUG_REQ() assumes the reply state of a request with a valid
624           * ref will not be destroyed until that reference is dropped. */
625         ptlrpc_req_drop_rs(req);
626
627         sptlrpc_svc_ctx_decref(req);
628
629         if (req != &req->rq_rqbd->rqbd_req) {
630                 /* NB request buffers use an embedded
631                  * req if the incoming req unlinked the
632                  * MD; this isn't one of them! */
633                 OBD_FREE(req, sizeof(*req));
634         }
635 }
636
637 /**
638  * drop a reference count of the request. if it reaches 0, we either
639  * put it into history list, or free it immediately.
640  */
641 void ptlrpc_server_drop_request(struct ptlrpc_request *req)
642 {
643         struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
644         struct ptlrpc_service             *svc = rqbd->rqbd_service;
645         int                                refcount;
646         cfs_list_t                        *tmp;
647         cfs_list_t                        *nxt;
648
649         if (!cfs_atomic_dec_and_test(&req->rq_refcount))
650                 return;
651
652         cfs_spin_lock(&svc->srv_at_lock);
653         if (req->rq_at_linked) {
654                 struct ptlrpc_at_array *array = &svc->srv_at_array;
655                 __u32 index = req->rq_at_index;
656
657                 LASSERT(!cfs_list_empty(&req->rq_timed_list));
658                 cfs_list_del_init(&req->rq_timed_list);
659                 cfs_spin_lock(&req->rq_lock);
660                 req->rq_at_linked = 0;
661                 cfs_spin_unlock(&req->rq_lock);
662                 array->paa_reqs_count[index]--;
663                 array->paa_count--;
664         } else
665                 LASSERT(cfs_list_empty(&req->rq_timed_list));
666         cfs_spin_unlock(&svc->srv_at_lock);
667
668         /* finalize request */
669         if (req->rq_export) {
670                 class_export_put(req->rq_export);
671                 req->rq_export = NULL;
672         }
673
674         cfs_spin_lock(&svc->srv_lock);
675
676         cfs_list_add(&req->rq_list, &rqbd->rqbd_reqs);
677
678         refcount = --(rqbd->rqbd_refcount);
679         if (refcount == 0) {
680                 /* request buffer is now idle: add to history */
681                 cfs_list_del(&rqbd->rqbd_list);
682                 cfs_list_add_tail(&rqbd->rqbd_list, &svc->srv_history_rqbds);
683                 svc->srv_n_history_rqbds++;
684
685                 /* cull some history?
686                  * I expect only about 1 or 2 rqbds need to be recycled here */
687                 while (svc->srv_n_history_rqbds > svc->srv_max_history_rqbds) {
688                         rqbd = cfs_list_entry(svc->srv_history_rqbds.next,
689                                               struct ptlrpc_request_buffer_desc,
690                                               rqbd_list);
691
692                         cfs_list_del(&rqbd->rqbd_list);
693                         svc->srv_n_history_rqbds--;
694
695                         /* remove rqbd's reqs from svc's req history while
696                          * I've got the service lock */
697                         cfs_list_for_each(tmp, &rqbd->rqbd_reqs) {
698                                 req = cfs_list_entry(tmp, struct ptlrpc_request,
699                                                      rq_list);
700                                 /* Track the highest culled req seq */
701                                 if (req->rq_history_seq >
702                                     svc->srv_request_max_cull_seq)
703                                         svc->srv_request_max_cull_seq =
704                                                 req->rq_history_seq;
705                                 cfs_list_del(&req->rq_history_list);
706                         }
707
708                         cfs_spin_unlock(&svc->srv_lock);
709
710                         cfs_list_for_each_safe(tmp, nxt, &rqbd->rqbd_reqs) {
711                                 req = cfs_list_entry(rqbd->rqbd_reqs.next,
712                                                      struct ptlrpc_request,
713                                                      rq_list);
714                                 cfs_list_del(&req->rq_list);
715                                 ptlrpc_server_free_request(req);
716                         }
717
718                         cfs_spin_lock(&svc->srv_lock);
719                         /*
720                          * now all reqs including the embedded req has been
721                          * disposed, schedule request buffer for re-use.
722                          */
723                         LASSERT(cfs_atomic_read(&rqbd->rqbd_req.rq_refcount) ==
724                                 0);
725                         cfs_list_add_tail(&rqbd->rqbd_list,
726                                           &svc->srv_idle_rqbds);
727                 }
728
729                 cfs_spin_unlock(&svc->srv_lock);
730         } else if (req->rq_reply_state && req->rq_reply_state->rs_prealloc) {
731                 /* If we are low on memory, we are not interested in history */
732                 cfs_list_del(&req->rq_list);
733                 cfs_list_del_init(&req->rq_history_list);
734                 cfs_spin_unlock(&svc->srv_lock);
735
736                 ptlrpc_server_free_request(req);
737         } else {
738                 cfs_spin_unlock(&svc->srv_lock);
739         }
740 }
741
742 /**
743  * to finish a request: stop sending more early replies, and release
744  * the request. should be called after we finished handling the request.
745  */
746 static void ptlrpc_server_finish_request(struct ptlrpc_service *svc,
747                                          struct ptlrpc_request *req)
748 {
749         ptlrpc_hpreq_fini(req);
750
751         cfs_spin_lock(&svc->srv_rq_lock);
752         svc->srv_n_active_reqs--;
753         if (req->rq_hp)
754                 svc->srv_n_active_hpreq--;
755         cfs_spin_unlock(&svc->srv_rq_lock);
756
757         ptlrpc_server_drop_request(req);
758 }
759
760 /**
761  * This function makes sure dead exports are evicted in a timely manner.
762  * This function is only called when some export receives a message (i.e.,
763  * the network is up.)
764  */
765 static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
766 {
767         struct obd_export *oldest_exp;
768         time_t oldest_time, new_time;
769
770         ENTRY;
771
772         LASSERT(exp);
773
774         /* Compensate for slow machines, etc, by faking our request time
775            into the future.  Although this can break the strict time-ordering
776            of the list, we can be really lazy here - we don't have to evict
777            at the exact right moment.  Eventually, all silent exports
778            will make it to the top of the list. */
779
780         /* Do not pay attention on 1sec or smaller renewals. */
781         new_time = cfs_time_current_sec() + extra_delay;
782         if (exp->exp_last_request_time + 1 /*second */ >= new_time)
783                 RETURN_EXIT;
784
785         exp->exp_last_request_time = new_time;
786         CDEBUG(D_HA, "updating export %s at "CFS_TIME_T" exp %p\n",
787                exp->exp_client_uuid.uuid,
788                exp->exp_last_request_time, exp);
789
790         /* exports may get disconnected from the chain even though the
791            export has references, so we must keep the spin lock while
792            manipulating the lists */
793         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
794
795         if (cfs_list_empty(&exp->exp_obd_chain_timed)) {
796                 /* this one is not timed */
797                 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
798                 RETURN_EXIT;
799         }
800
801         cfs_list_move_tail(&exp->exp_obd_chain_timed,
802                            &exp->exp_obd->obd_exports_timed);
803
804         oldest_exp = cfs_list_entry(exp->exp_obd->obd_exports_timed.next,
805                                     struct obd_export, exp_obd_chain_timed);
806         oldest_time = oldest_exp->exp_last_request_time;
807         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
808
809         if (exp->exp_obd->obd_recovering) {
810                 /* be nice to everyone during recovery */
811                 EXIT;
812                 return;
813         }
814
815         /* Note - racing to start/reset the obd_eviction timer is safe */
816         if (exp->exp_obd->obd_eviction_timer == 0) {
817                 /* Check if the oldest entry is expired. */
818                 if (cfs_time_current_sec() > (oldest_time + PING_EVICT_TIMEOUT +
819                                               extra_delay)) {
820                         /* We need a second timer, in case the net was down and
821                          * it just came back. Since the pinger may skip every
822                          * other PING_INTERVAL (see note in ptlrpc_pinger_main),
823                          * we better wait for 3. */
824                         exp->exp_obd->obd_eviction_timer =
825                                 cfs_time_current_sec() + 3 * PING_INTERVAL;
826                         CDEBUG(D_HA, "%s: Think about evicting %s from "CFS_TIME_T"\n",
827                                exp->exp_obd->obd_name, 
828                                obd_export_nid2str(oldest_exp), oldest_time);
829                 }
830         } else {
831                 if (cfs_time_current_sec() >
832                     (exp->exp_obd->obd_eviction_timer + extra_delay)) {
833                         /* The evictor won't evict anyone who we've heard from
834                          * recently, so we don't have to check before we start
835                          * it. */
836                         if (!ping_evictor_wake(exp))
837                                 exp->exp_obd->obd_eviction_timer = 0;
838                 }
839         }
840
841         EXIT;
842 }
843
844 /**
845  * Sanity check request \a req.
846  * Return 0 if all is ok, error code otherwise.
847  */
848 static int ptlrpc_check_req(struct ptlrpc_request *req)
849 {
850         int rc = 0;
851
852         if (unlikely(lustre_msg_get_conn_cnt(req->rq_reqmsg) <
853                      req->rq_export->exp_conn_cnt)) {
854                 DEBUG_REQ(D_ERROR, req,
855                           "DROPPING req from old connection %d < %d",
856                           lustre_msg_get_conn_cnt(req->rq_reqmsg),
857                           req->rq_export->exp_conn_cnt);
858                 return -EEXIST;
859         }
860         if (unlikely(req->rq_export->exp_obd &&
861                      req->rq_export->exp_obd->obd_fail)) {
862              /* Failing over, don't handle any more reqs, send
863                 error response instead. */
864                 CDEBUG(D_RPCTRACE, "Dropping req %p for failed obd %s\n",
865                        req, req->rq_export->exp_obd->obd_name);
866                 rc = -ENODEV;
867         } else if (lustre_msg_get_flags(req->rq_reqmsg) &
868                    (MSG_REPLAY | MSG_REQ_REPLAY_DONE) &&
869                    !(req->rq_export->exp_obd->obd_recovering)) {
870                         DEBUG_REQ(D_ERROR, req,
871                                   "Invalid replay without recovery");
872                         class_fail_export(req->rq_export);
873                         rc = -ENODEV;
874         } else if (lustre_msg_get_transno(req->rq_reqmsg) != 0 &&
875                    !(req->rq_export->exp_obd->obd_recovering)) {
876                         DEBUG_REQ(D_ERROR, req, "Invalid req with transno "
877                                   LPU64" without recovery",
878                                   lustre_msg_get_transno(req->rq_reqmsg));
879                         class_fail_export(req->rq_export);
880                         rc = -ENODEV;
881         }
882
883         if (unlikely(rc < 0)) {
884                 req->rq_status = rc;
885                 ptlrpc_error(req);
886         }
887         return rc;
888 }
889
890 static void ptlrpc_at_set_timer(struct ptlrpc_service *svc)
891 {
892         struct ptlrpc_at_array *array = &svc->srv_at_array;
893         __s32 next;
894
895         cfs_spin_lock(&svc->srv_at_lock);
896         if (array->paa_count == 0) {
897                 cfs_timer_disarm(&svc->srv_at_timer);
898                 cfs_spin_unlock(&svc->srv_at_lock);
899                 return;
900         }
901
902         /* Set timer for closest deadline */
903         next = (__s32)(array->paa_deadline - cfs_time_current_sec() -
904                        at_early_margin);
905         if (next <= 0)
906                 ptlrpc_at_timer((unsigned long)svc);
907         else
908                 cfs_timer_arm(&svc->srv_at_timer, cfs_time_shift(next));
909         cfs_spin_unlock(&svc->srv_at_lock);
910         CDEBUG(D_INFO, "armed %s at %+ds\n", svc->srv_name, next);
911 }
912
913 /* Add rpc to early reply check list */
914 static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
915 {
916         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
917         struct ptlrpc_request *rq = NULL;
918         struct ptlrpc_at_array *array = &svc->srv_at_array;
919         __u32 index;
920         int found = 0;
921
922         if (AT_OFF)
923                 return(0);
924
925         if (req->rq_no_reply)
926                 return 0;
927
928         if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0)
929                 return(-ENOSYS);
930
931         cfs_spin_lock(&svc->srv_at_lock);
932         LASSERT(cfs_list_empty(&req->rq_timed_list));
933
934         index = (unsigned long)req->rq_deadline % array->paa_size;
935         if (array->paa_reqs_count[index] > 0) {
936                 /* latest rpcs will have the latest deadlines in the list,
937                  * so search backward. */
938                 cfs_list_for_each_entry_reverse(rq,
939                                                 &array->paa_reqs_array[index],
940                                                 rq_timed_list) {
941                         if (req->rq_deadline >= rq->rq_deadline) {
942                                 cfs_list_add(&req->rq_timed_list,
943                                              &rq->rq_timed_list);
944                                 break;
945                         }
946                 }
947         }
948
949         /* Add the request at the head of the list */
950         if (cfs_list_empty(&req->rq_timed_list))
951                 cfs_list_add(&req->rq_timed_list,
952                              &array->paa_reqs_array[index]);
953
954         cfs_spin_lock(&req->rq_lock);
955         req->rq_at_linked = 1;
956         cfs_spin_unlock(&req->rq_lock);
957         req->rq_at_index = index;
958         array->paa_reqs_count[index]++;
959         array->paa_count++;
960         if (array->paa_count == 1 || array->paa_deadline > req->rq_deadline) {
961                 array->paa_deadline = req->rq_deadline;
962                 found = 1;
963         }
964         cfs_spin_unlock(&svc->srv_at_lock);
965
966         if (found)
967                 ptlrpc_at_set_timer(svc);
968
969         return 0;
970 }
971
972 static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
973 {
974         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
975         struct ptlrpc_request *reqcopy;
976         struct lustre_msg *reqmsg;
977         cfs_duration_t olddl = req->rq_deadline - cfs_time_current_sec();
978         time_t newdl;
979         int rc;
980         ENTRY;
981
982         /* deadline is when the client expects us to reply, margin is the
983            difference between clients' and servers' expectations */
984         DEBUG_REQ(D_ADAPTTO, req,
985                   "%ssending early reply (deadline %+lds, margin %+lds) for "
986                   "%d+%d", AT_OFF ? "AT off - not " : "",
987                   olddl, olddl - at_get(&svc->srv_at_estimate),
988                   at_get(&svc->srv_at_estimate), at_extra);
989
990         if (AT_OFF)
991                 RETURN(0);
992
993         if (olddl < 0) {
994                 DEBUG_REQ(D_WARNING, req, "Already past deadline (%+lds), "
995                           "not sending early reply. Consider increasing "
996                           "at_early_margin (%d)?", olddl, at_early_margin);
997
998                 /* Return an error so we're not re-added to the timed list. */
999                 RETURN(-ETIMEDOUT);
1000         }
1001
1002         if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0){
1003                 DEBUG_REQ(D_INFO, req, "Wanted to ask client for more time, "
1004                           "but no AT support");
1005                 RETURN(-ENOSYS);
1006         }
1007
1008         if (req->rq_export &&
1009             lustre_msg_get_flags(req->rq_reqmsg) &
1010             (MSG_REPLAY | MSG_REQ_REPLAY_DONE | MSG_LOCK_REPLAY_DONE)) {
1011                 /* During recovery, we don't want to send too many early
1012                  * replies, but on the other hand we want to make sure the
1013                  * client has enough time to resend if the rpc is lost. So
1014                  * during the recovery period send at least 4 early replies,
1015                  * spacing them every at_extra if we can. at_estimate should
1016                  * always equal this fixed value during recovery. */
1017                 at_measured(&svc->srv_at_estimate, min(at_extra,
1018                             req->rq_export->exp_obd->obd_recovery_timeout / 4));
1019         } else {
1020                 /* Fake our processing time into the future to ask the clients
1021                  * for some extra amount of time */
1022                 at_measured(&svc->srv_at_estimate, at_extra +
1023                             cfs_time_current_sec() -
1024                             req->rq_arrival_time.tv_sec);
1025
1026                 /* Check to see if we've actually increased the deadline -
1027                  * we may be past adaptive_max */
1028                 if (req->rq_deadline >= req->rq_arrival_time.tv_sec +
1029                     at_get(&svc->srv_at_estimate)) {
1030                         DEBUG_REQ(D_WARNING, req, "Couldn't add any time "
1031                                   "(%ld/%ld), not sending early reply\n",
1032                                   olddl, req->rq_arrival_time.tv_sec +
1033                                   at_get(&svc->srv_at_estimate) -
1034                                   cfs_time_current_sec());
1035                         RETURN(-ETIMEDOUT);
1036                 }
1037         }
1038         newdl = cfs_time_current_sec() + at_get(&svc->srv_at_estimate);
1039
1040         OBD_ALLOC(reqcopy, sizeof *reqcopy);
1041         if (reqcopy == NULL)
1042                 RETURN(-ENOMEM);
1043         OBD_ALLOC_LARGE(reqmsg, req->rq_reqlen);
1044         if (!reqmsg) {
1045                 OBD_FREE(reqcopy, sizeof *reqcopy);
1046                 RETURN(-ENOMEM);
1047         }
1048
1049         *reqcopy = *req;
1050         reqcopy->rq_reply_state = NULL;
1051         reqcopy->rq_rep_swab_mask = 0;
1052         reqcopy->rq_pack_bulk = 0;
1053         reqcopy->rq_pack_udesc = 0;
1054         reqcopy->rq_packed_final = 0;
1055         sptlrpc_svc_ctx_addref(reqcopy);
1056         /* We only need the reqmsg for the magic */
1057         reqcopy->rq_reqmsg = reqmsg;
1058         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
1059
1060         LASSERT(cfs_atomic_read(&req->rq_refcount));
1061         /** if it is last refcount then early reply isn't needed */
1062         if (cfs_atomic_read(&req->rq_refcount) == 1) {
1063                 DEBUG_REQ(D_ADAPTTO, reqcopy, "Normal reply already sent out, "
1064                           "abort sending early reply\n");
1065                 GOTO(out, rc = -EINVAL);
1066         }
1067
1068         /* Connection ref */
1069         reqcopy->rq_export = class_conn2export(
1070                                      lustre_msg_get_handle(reqcopy->rq_reqmsg));
1071         if (reqcopy->rq_export == NULL)
1072                 GOTO(out, rc = -ENODEV);
1073
1074         /* RPC ref */
1075         class_export_rpc_get(reqcopy->rq_export);
1076         if (reqcopy->rq_export->exp_obd &&
1077             reqcopy->rq_export->exp_obd->obd_fail)
1078                 GOTO(out_put, rc = -ENODEV);
1079
1080         rc = lustre_pack_reply_flags(reqcopy, 1, NULL, NULL, LPRFL_EARLY_REPLY);
1081         if (rc)
1082                 GOTO(out_put, rc);
1083
1084         rc = ptlrpc_send_reply(reqcopy, PTLRPC_REPLY_EARLY);
1085
1086         if (!rc) {
1087                 /* Adjust our own deadline to what we told the client */
1088                 req->rq_deadline = newdl;
1089                 req->rq_early_count++; /* number sent, server side */
1090         } else {
1091                 DEBUG_REQ(D_ERROR, req, "Early reply send failed %d", rc);
1092         }
1093
1094         /* Free the (early) reply state from lustre_pack_reply.
1095            (ptlrpc_send_reply takes it's own rs ref, so this is safe here) */
1096         ptlrpc_req_drop_rs(reqcopy);
1097
1098 out_put:
1099         class_export_rpc_put(reqcopy->rq_export);
1100         class_export_put(reqcopy->rq_export);
1101 out:
1102         sptlrpc_svc_ctx_decref(reqcopy);
1103         OBD_FREE_LARGE(reqmsg, req->rq_reqlen);
1104         OBD_FREE(reqcopy, sizeof *reqcopy);
1105         RETURN(rc);
1106 }
1107
1108 /* Send early replies to everybody expiring within at_early_margin
1109    asking for at_extra time */
1110 static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
1111 {
1112         struct ptlrpc_request *rq, *n;
1113         cfs_list_t work_list;
1114         struct ptlrpc_at_array *array = &svc->srv_at_array;
1115         __u32  index, count;
1116         time_t deadline;
1117         time_t now = cfs_time_current_sec();
1118         cfs_duration_t delay;
1119         int first, counter = 0;
1120         ENTRY;
1121
1122         cfs_spin_lock(&svc->srv_at_lock);
1123         if (svc->srv_at_check == 0) {
1124                 cfs_spin_unlock(&svc->srv_at_lock);
1125                 RETURN(0);
1126         }
1127         delay = cfs_time_sub(cfs_time_current(), svc->srv_at_checktime);
1128         svc->srv_at_check = 0;
1129
1130         if (array->paa_count == 0) {
1131                 cfs_spin_unlock(&svc->srv_at_lock);
1132                 RETURN(0);
1133         }
1134
1135         /* The timer went off, but maybe the nearest rpc already completed. */
1136         first = array->paa_deadline - now;
1137         if (first > at_early_margin) {
1138                 /* We've still got plenty of time.  Reset the timer. */
1139                 cfs_spin_unlock(&svc->srv_at_lock);
1140                 ptlrpc_at_set_timer(svc);
1141                 RETURN(0);
1142         }
1143
1144         /* We're close to a timeout, and we don't know how much longer the
1145            server will take. Send early replies to everyone expiring soon. */
1146         CFS_INIT_LIST_HEAD(&work_list);
1147         deadline = -1;
1148         index = (unsigned long)array->paa_deadline % array->paa_size;
1149         count = array->paa_count;
1150         while (count > 0) {
1151                 count -= array->paa_reqs_count[index];
1152                 cfs_list_for_each_entry_safe(rq, n,
1153                                              &array->paa_reqs_array[index],
1154                                              rq_timed_list) {
1155                         if (rq->rq_deadline <= now + at_early_margin) {
1156                                 cfs_list_del_init(&rq->rq_timed_list);
1157                                 /**
1158                                  * ptlrpc_server_drop_request() may drop
1159                                  * refcount to 0 already. Let's check this and
1160                                  * don't add entry to work_list
1161                                  */
1162                                 if (likely(cfs_atomic_inc_not_zero(&rq->rq_refcount)))
1163                                         cfs_list_add(&rq->rq_timed_list, &work_list);
1164                                 counter++;
1165                                 array->paa_reqs_count[index]--;
1166                                 array->paa_count--;
1167                                 cfs_spin_lock(&rq->rq_lock);
1168                                 rq->rq_at_linked = 0;
1169                                 cfs_spin_unlock(&rq->rq_lock);
1170                                 continue;
1171                         }
1172
1173                         /* update the earliest deadline */
1174                         if (deadline == -1 || rq->rq_deadline < deadline)
1175                                 deadline = rq->rq_deadline;
1176
1177                         break;
1178                 }
1179
1180                 if (++index >= array->paa_size)
1181                         index = 0;
1182         }
1183         array->paa_deadline = deadline;
1184         cfs_spin_unlock(&svc->srv_at_lock);
1185
1186         /* we have a new earliest deadline, restart the timer */
1187         ptlrpc_at_set_timer(svc);
1188
1189         CDEBUG(D_ADAPTTO, "timeout in %+ds, asking for %d secs on %d early "
1190                "replies\n", first, at_extra, counter);
1191         if (first < 0) {
1192                 /* We're already past request deadlines before we even get a
1193                    chance to send early replies */
1194                 LCONSOLE_WARN("%s: This server is not able to keep up with "
1195                               "request traffic (cpu-bound).\n", svc->srv_name);
1196                 CWARN("earlyQ=%d reqQ=%d recA=%d, svcEst=%d, "
1197                       "delay="CFS_DURATION_T"(jiff)\n",
1198                       counter, svc->srv_n_queued_reqs, svc->srv_n_active_reqs,
1199                       at_get(&svc->srv_at_estimate), delay);
1200         }
1201
1202         /* we took additional refcount so entries can't be deleted from list, no
1203          * locking is needed */
1204         while (!cfs_list_empty(&work_list)) {
1205                 rq = cfs_list_entry(work_list.next, struct ptlrpc_request,
1206                                     rq_timed_list);
1207                 cfs_list_del_init(&rq->rq_timed_list);
1208
1209                 if (ptlrpc_at_send_early_reply(rq) == 0)
1210                         ptlrpc_at_add_timed(rq);
1211
1212                 ptlrpc_server_drop_request(rq);
1213         }
1214
1215         RETURN(0);
1216 }
1217
1218 /**
1219  * Put the request to the export list if the request may become
1220  * a high priority one.
1221  */
1222 static int ptlrpc_hpreq_init(struct ptlrpc_service *svc,
1223                              struct ptlrpc_request *req)
1224 {
1225         int rc = 0;
1226         ENTRY;
1227
1228         if (svc->srv_hpreq_handler) {
1229                 rc = svc->srv_hpreq_handler(req);
1230                 if (rc)
1231                         RETURN(rc);
1232         }
1233         if (req->rq_export && req->rq_ops) {
1234                 /* Perform request specific check. We should do this check
1235                  * before the request is added into exp_hp_rpcs list otherwise
1236                  * it may hit swab race at LU-1044. */
1237                 if (req->rq_ops->hpreq_check)
1238                         rc = req->rq_ops->hpreq_check(req);
1239
1240                 cfs_spin_lock_bh(&req->rq_export->exp_rpc_lock);
1241                 cfs_list_add(&req->rq_exp_list,
1242                              &req->rq_export->exp_hp_rpcs);
1243                 cfs_spin_unlock_bh(&req->rq_export->exp_rpc_lock);
1244         }
1245
1246         RETURN(rc);
1247 }
1248
1249 /** Remove the request from the export list. */
1250 static void ptlrpc_hpreq_fini(struct ptlrpc_request *req)
1251 {
1252         ENTRY;
1253         if (req->rq_export && req->rq_ops) {
1254                 /* refresh lock timeout again so that client has more
1255                  * room to send lock cancel RPC. */
1256                 if (req->rq_ops->hpreq_fini)
1257                         req->rq_ops->hpreq_fini(req);
1258
1259                 cfs_spin_lock_bh(&req->rq_export->exp_rpc_lock);
1260                 cfs_list_del_init(&req->rq_exp_list);
1261                 cfs_spin_unlock_bh(&req->rq_export->exp_rpc_lock);
1262         }
1263         EXIT;
1264 }
1265
1266 /**
1267  * Make the request a high priority one.
1268  *
1269  * All the high priority requests are queued in a separate FIFO
1270  * ptlrpc_service::srv_request_hpq list which is parallel to
1271  * ptlrpc_service::srv_request_queue list but has a higher priority
1272  * for handling.
1273  *
1274  * \see ptlrpc_server_handle_request().
1275  */
1276 static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service *svc,
1277                                         struct ptlrpc_request *req)
1278 {
1279         ENTRY;
1280         LASSERT(svc != NULL);
1281         cfs_spin_lock(&req->rq_lock);
1282         if (req->rq_hp == 0) {
1283                 int opc = lustre_msg_get_opc(req->rq_reqmsg);
1284
1285                 /* Add to the high priority queue. */
1286                 cfs_list_move_tail(&req->rq_list, &svc->srv_request_hpq);
1287                 req->rq_hp = 1;
1288                 if (opc != OBD_PING)
1289                         DEBUG_REQ(D_RPCTRACE, req, "high priority req");
1290         }
1291         cfs_spin_unlock(&req->rq_lock);
1292         EXIT;
1293 }
1294
1295 /**
1296  * \see ptlrpc_hpreq_reorder_nolock
1297  */
1298 void ptlrpc_hpreq_reorder(struct ptlrpc_request *req)
1299 {
1300         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
1301         ENTRY;
1302
1303         cfs_spin_lock(&svc->srv_rq_lock);
1304         /* It may happen that the request is already taken for the processing
1305          * but still in the export list, or the request is not in the request
1306          * queue but in the export list already, do not add it into the
1307          * HP list. */
1308         if (!cfs_list_empty(&req->rq_list))
1309                 ptlrpc_hpreq_reorder_nolock(svc, req);
1310         cfs_spin_unlock(&svc->srv_rq_lock);
1311         EXIT;
1312 }
1313
1314 /** Check if the request is a high priority one. */
1315 static int ptlrpc_server_hpreq_check(struct ptlrpc_service *svc,
1316                                      struct ptlrpc_request *req)
1317 {
1318         ENTRY;
1319
1320         /* Check by request opc. */
1321         if (OBD_PING == lustre_msg_get_opc(req->rq_reqmsg))
1322                 RETURN(1);
1323
1324         RETURN(ptlrpc_hpreq_init(svc, req));
1325 }
1326
1327 /** Check if a request is a high priority one. */
1328 static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
1329                                      struct ptlrpc_request *req)
1330 {
1331         int rc;
1332         ENTRY;
1333
1334         rc = ptlrpc_server_hpreq_check(svc, req);
1335         if (rc < 0)
1336                 RETURN(rc);
1337
1338         cfs_spin_lock(&svc->srv_rq_lock);
1339
1340         if (rc)
1341                 ptlrpc_hpreq_reorder_nolock(svc, req);
1342         else
1343                 cfs_list_add_tail(&req->rq_list,
1344                                   &svc->srv_request_queue);
1345
1346         cfs_spin_unlock(&svc->srv_rq_lock);
1347
1348         RETURN(0);
1349 }
1350
1351 /**
1352  * Allow to handle high priority request
1353  * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
1354  * to get reliable result
1355  */
1356 static int ptlrpc_server_allow_high(struct ptlrpc_service *svc, int force)
1357 {
1358         if (force)
1359                 return 1;
1360
1361         if (svc->srv_n_active_reqs >= svc->srv_threads_running - 1)
1362                 return 0;
1363
1364         return cfs_list_empty(&svc->srv_request_queue) ||
1365                svc->srv_hpreq_count < svc->srv_hpreq_ratio;
1366 }
1367
1368 static int ptlrpc_server_high_pending(struct ptlrpc_service *svc, int force)
1369 {
1370         return ptlrpc_server_allow_high(svc, force) &&
1371                !cfs_list_empty(&svc->srv_request_hpq);
1372 }
1373
1374 /**
1375  * Only allow normal priority requests on a service that has a high-priority
1376  * queue if forced (i.e. cleanup), if there are other high priority requests
1377  * already being processed (i.e. those threads can service more high-priority
1378  * requests), or if there are enough idle threads that a later thread can do
1379  * a high priority request.
1380  * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
1381  * to get reliable result
1382  */
1383 static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
1384 {
1385 #ifndef __KERNEL__
1386         if (1) /* always allow to handle normal request for liblustre */
1387                 return 1;
1388 #endif
1389         if (force ||
1390             svc->srv_n_active_reqs < svc->srv_threads_running - 2)
1391                 return 1;
1392
1393         if (svc->srv_n_active_reqs >= svc->srv_threads_running - 1)
1394                 return 0;
1395
1396         return svc->srv_n_active_hpreq > 0 || svc->srv_hpreq_handler == NULL;
1397 }
1398
1399 static int ptlrpc_server_normal_pending(struct ptlrpc_service *svc, int force)
1400 {
1401         return ptlrpc_server_allow_normal(svc, force) &&
1402                !cfs_list_empty(&svc->srv_request_queue);
1403 }
1404
1405 /**
1406  * Returns true if there are requests available in incoming
1407  * request queue for processing and it is allowed to fetch them.
1408  * User can call it w/o any lock but need to hold ptlrpc_service::srv_rq_lock
1409  * to get reliable result
1410  * \see ptlrpc_server_allow_normal
1411  * \see ptlrpc_server_allow high
1412  */
1413 static inline int
1414 ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
1415 {
1416         return ptlrpc_server_high_pending(svc, force) ||
1417                ptlrpc_server_normal_pending(svc, force);
1418 }
1419
1420 /**
1421  * Fetch a request for processing from queue of unprocessed requests.
1422  * Favors high-priority requests.
1423  * Returns a pointer to fetched request.
1424  */
1425 static struct ptlrpc_request *
1426 ptlrpc_server_request_get(struct ptlrpc_service *svc, int force)
1427 {
1428         struct ptlrpc_request *req;
1429         ENTRY;
1430
1431         if (ptlrpc_server_high_pending(svc, force)) {
1432                 req = cfs_list_entry(svc->srv_request_hpq.next,
1433                                      struct ptlrpc_request, rq_list);
1434                 svc->srv_hpreq_count++;
1435                 RETURN(req);
1436
1437         }
1438
1439         if (ptlrpc_server_normal_pending(svc, force)) {
1440                 req = cfs_list_entry(svc->srv_request_queue.next,
1441                                      struct ptlrpc_request, rq_list);
1442                 svc->srv_hpreq_count = 0;
1443                 RETURN(req);
1444         }
1445         RETURN(NULL);
1446 }
1447
1448 /**
1449  * Handle freshly incoming reqs, add to timed early reply list,
1450  * pass on to regular request queue.
1451  * All incoming requests pass through here before getting into
1452  * ptlrpc_server_handle_req later on.
1453  */
1454 static int
1455 ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
1456 {
1457         struct ptlrpc_request *req;
1458         __u32                  deadline;
1459         int                    rc;
1460         ENTRY;
1461
1462         LASSERT(svc);
1463
1464         cfs_spin_lock(&svc->srv_lock);
1465         if (cfs_list_empty(&svc->srv_req_in_queue)) {
1466                 cfs_spin_unlock(&svc->srv_lock);
1467                 RETURN(0);
1468         }
1469
1470         req = cfs_list_entry(svc->srv_req_in_queue.next,
1471                              struct ptlrpc_request, rq_list);
1472         cfs_list_del_init (&req->rq_list);
1473         svc->srv_n_queued_reqs--;
1474         /* Consider this still a "queued" request as far as stats are
1475            concerned */
1476         cfs_spin_unlock(&svc->srv_lock);
1477
1478         /* go through security check/transform */
1479         rc = sptlrpc_svc_unwrap_request(req);
1480         switch (rc) {
1481         case SECSVC_OK:
1482                 break;
1483         case SECSVC_COMPLETE:
1484                 target_send_reply(req, 0, OBD_FAIL_MDS_ALL_REPLY_NET);
1485                 goto err_req;
1486         case SECSVC_DROP:
1487                 goto err_req;
1488         default:
1489                 LBUG();
1490         }
1491
1492         /*
1493          * for null-flavored rpc, msg has been unpacked by sptlrpc, although
1494          * redo it wouldn't be harmful.
1495          */
1496         if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) {
1497                 rc = ptlrpc_unpack_req_msg(req, req->rq_reqlen);
1498                 if (rc != 0) {
1499                         CERROR("error unpacking request: ptl %d from %s "
1500                                "x"LPU64"\n", svc->srv_req_portal,
1501                                libcfs_id2str(req->rq_peer), req->rq_xid);
1502                         goto err_req;
1503                 }
1504         }
1505
1506         rc = lustre_unpack_req_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
1507         if (rc) {
1508                 CERROR ("error unpacking ptlrpc body: ptl %d from %s x"
1509                         LPU64"\n", svc->srv_req_portal,
1510                         libcfs_id2str(req->rq_peer), req->rq_xid);
1511                 goto err_req;
1512         }
1513
1514         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_REQ_OPC) &&
1515             lustre_msg_get_opc(req->rq_reqmsg) == cfs_fail_val) {
1516                 CERROR("drop incoming rpc opc %u, x"LPU64"\n",
1517                        cfs_fail_val, req->rq_xid);
1518                 goto err_req;
1519         }
1520
1521         rc = -EINVAL;
1522         if (lustre_msg_get_type(req->rq_reqmsg) != PTL_RPC_MSG_REQUEST) {
1523                 CERROR("wrong packet type received (type=%u) from %s\n",
1524                        lustre_msg_get_type(req->rq_reqmsg),
1525                        libcfs_id2str(req->rq_peer));
1526                 goto err_req;
1527         }
1528
1529         switch(lustre_msg_get_opc(req->rq_reqmsg)) {
1530         case MDS_WRITEPAGE:
1531         case OST_WRITE:
1532                 req->rq_bulk_write = 1;
1533                 break;
1534         case MDS_READPAGE:
1535         case OST_READ:
1536         case MGS_CONFIG_READ:
1537                 req->rq_bulk_read = 1;
1538                 break;
1539         }
1540
1541         CDEBUG(D_RPCTRACE, "got req x"LPU64"\n", req->rq_xid);
1542
1543         req->rq_export = class_conn2export(
1544                 lustre_msg_get_handle(req->rq_reqmsg));
1545         if (req->rq_export) {
1546                 rc = ptlrpc_check_req(req);
1547                 if (rc == 0) {
1548                         rc = sptlrpc_target_export_check(req->rq_export, req);
1549                         if (rc)
1550                                 DEBUG_REQ(D_ERROR, req, "DROPPING req with "
1551                                           "illegal security flavor,");
1552                 }
1553
1554                 if (rc)
1555                         goto err_req;
1556                 ptlrpc_update_export_timer(req->rq_export, 0);
1557         }
1558
1559         /* req_in handling should/must be fast */
1560         if (cfs_time_current_sec() - req->rq_arrival_time.tv_sec > 5)
1561                 DEBUG_REQ(D_WARNING, req, "Slow req_in handling "CFS_DURATION_T"s",
1562                           cfs_time_sub(cfs_time_current_sec(),
1563                                        req->rq_arrival_time.tv_sec));
1564
1565         /* Set rpc server deadline and add it to the timed list */
1566         deadline = (lustre_msghdr_get_flags(req->rq_reqmsg) &
1567                     MSGHDR_AT_SUPPORT) ?
1568                    /* The max time the client expects us to take */
1569                    lustre_msg_get_timeout(req->rq_reqmsg) : obd_timeout;
1570         req->rq_deadline = req->rq_arrival_time.tv_sec + deadline;
1571         if (unlikely(deadline == 0)) {
1572                 DEBUG_REQ(D_ERROR, req, "Dropping request with 0 timeout");
1573                 goto err_req;
1574         }
1575
1576         ptlrpc_at_add_timed(req);
1577
1578         /* Move it over to the request processing queue */
1579         rc = ptlrpc_server_request_add(svc, req);
1580         if (rc) {
1581                 ptlrpc_hpreq_fini(req);
1582                 GOTO(err_req, rc);
1583         }
1584         cfs_waitq_signal(&svc->srv_waitq);
1585         RETURN(1);
1586
1587 err_req:
1588         cfs_spin_lock(&svc->srv_rq_lock);
1589         svc->srv_n_active_reqs++;
1590         cfs_spin_unlock(&svc->srv_rq_lock);
1591         ptlrpc_server_finish_request(svc, req);
1592
1593         RETURN(1);
1594 }
1595
1596 /**
1597  * Main incoming request handling logic.
1598  * Calls handler function from service to do actual processing.
1599  */
1600 static int
1601 ptlrpc_server_handle_request(struct ptlrpc_service *svc,
1602                              struct ptlrpc_thread *thread)
1603 {
1604         struct obd_export     *export = NULL;
1605         struct ptlrpc_request *request;
1606         struct timeval         work_start;
1607         struct timeval         work_end;
1608         long                   timediff;
1609         int                    rc;
1610         int                    fail_opc = 0;
1611         ENTRY;
1612
1613         LASSERT(svc);
1614
1615         cfs_spin_lock(&svc->srv_rq_lock);
1616 #ifndef __KERNEL__
1617         /* !@%$# liblustre only has 1 thread */
1618         if (cfs_atomic_read(&svc->srv_n_difficult_replies) != 0) {
1619                 cfs_spin_unlock(&svc->srv_rq_lock);
1620                 RETURN(0);
1621         }
1622 #endif
1623         request = ptlrpc_server_request_get(svc, 0);
1624         if  (request == NULL) {
1625                 cfs_spin_unlock(&svc->srv_rq_lock);
1626                 RETURN(0);
1627         }
1628
1629         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT))
1630                 fail_opc = OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT;
1631         else if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
1632                 fail_opc = OBD_FAIL_PTLRPC_HPREQ_TIMEOUT;
1633
1634         if (unlikely(fail_opc)) {
1635                 if (request->rq_export && request->rq_ops) {
1636                         cfs_spin_unlock(&svc->srv_rq_lock);
1637                         OBD_FAIL_TIMEOUT(fail_opc, 4);
1638                         cfs_spin_lock(&svc->srv_rq_lock);
1639                         request = ptlrpc_server_request_get(svc, 0);
1640                         if  (request == NULL) {
1641                                 cfs_spin_unlock(&svc->srv_rq_lock);
1642                                 RETURN(0);
1643                         }
1644                 }
1645         }
1646
1647         cfs_list_del_init(&request->rq_list);
1648         svc->srv_n_active_reqs++;
1649         if (request->rq_hp)
1650                 svc->srv_n_active_hpreq++;
1651
1652         cfs_spin_unlock(&svc->srv_rq_lock);
1653
1654         ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
1655
1656         if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG))
1657                 libcfs_debug_dumplog();
1658
1659         cfs_gettimeofday(&work_start);
1660         timediff = cfs_timeval_sub(&work_start, &request->rq_arrival_time,NULL);
1661         if (likely(svc->srv_stats != NULL)) {
1662                 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQWAIT_CNTR,
1663                                     timediff);
1664                 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQQDEPTH_CNTR,
1665                                     svc->srv_n_queued_reqs);
1666                 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQACTIVE_CNTR,
1667                                     svc->srv_n_active_reqs);
1668                 lprocfs_counter_add(svc->srv_stats, PTLRPC_TIMEOUT,
1669                                     at_get(&svc->srv_at_estimate));
1670         }
1671
1672         rc = lu_context_init(&request->rq_session,
1673                              LCT_SESSION|LCT_REMEMBER|LCT_NOREF);
1674         if (rc) {
1675                 CERROR("Failure to initialize session: %d\n", rc);
1676                 goto out_req;
1677         }
1678         request->rq_session.lc_thread = thread;
1679         request->rq_session.lc_cookie = 0x5;
1680         lu_context_enter(&request->rq_session);
1681
1682         CDEBUG(D_NET, "got req "LPU64"\n", request->rq_xid);
1683
1684         request->rq_svc_thread = thread;
1685         if (thread)
1686                 request->rq_svc_thread->t_env->le_ses = &request->rq_session;
1687
1688         if (likely(request->rq_export)) {
1689                 if (unlikely(ptlrpc_check_req(request)))
1690                         goto put_conn;
1691                 ptlrpc_update_export_timer(request->rq_export, timediff >> 19);
1692                 export = class_export_rpc_get(request->rq_export);
1693         }
1694
1695         /* Discard requests queued for longer than the deadline.
1696            The deadline is increased if we send an early reply. */
1697         if (cfs_time_current_sec() > request->rq_deadline) {
1698                 DEBUG_REQ(D_ERROR, request, "Dropping timed-out request from %s"
1699                           ": deadline "CFS_DURATION_T":"CFS_DURATION_T"s ago\n",
1700                           libcfs_id2str(request->rq_peer),
1701                           cfs_time_sub(request->rq_deadline,
1702                           request->rq_arrival_time.tv_sec),
1703                           cfs_time_sub(cfs_time_current_sec(),
1704                           request->rq_deadline));
1705                 goto put_rpc_export;
1706         }
1707
1708         CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc "
1709                "%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
1710                (request->rq_export ?
1711                 (char *)request->rq_export->exp_client_uuid.uuid : "0"),
1712                (request->rq_export ?
1713                 cfs_atomic_read(&request->rq_export->exp_refcount) : -99),
1714                lustre_msg_get_status(request->rq_reqmsg), request->rq_xid,
1715                libcfs_id2str(request->rq_peer),
1716                lustre_msg_get_opc(request->rq_reqmsg));
1717
1718         if (lustre_msg_get_opc(request->rq_reqmsg) != OBD_PING)
1719                 CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, cfs_fail_val);
1720
1721         rc = svc->srv_handler(request);
1722
1723         ptlrpc_rqphase_move(request, RQ_PHASE_COMPLETE);
1724
1725 put_rpc_export:
1726         if (export != NULL)
1727                 class_export_rpc_put(export);
1728 put_conn:
1729         lu_context_exit(&request->rq_session);
1730         lu_context_fini(&request->rq_session);
1731
1732         if (unlikely(cfs_time_current_sec() > request->rq_deadline)) {
1733                 DEBUG_REQ(D_WARNING, request, "Request x"LPU64" took longer "
1734                           "than estimated ("CFS_DURATION_T":"CFS_DURATION_T"s);"
1735                           " client may timeout.",
1736                           request->rq_xid, cfs_time_sub(request->rq_deadline,
1737                           request->rq_arrival_time.tv_sec),
1738                           cfs_time_sub(cfs_time_current_sec(),
1739                           request->rq_deadline));
1740         }
1741
1742         cfs_gettimeofday(&work_end);
1743         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
1744         CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
1745                "%s:%s+%d:%d:x"LPU64":%s:%d Request procesed in "
1746                "%ldus (%ldus total) trans "LPU64" rc %d/%d\n",
1747                 cfs_curproc_comm(),
1748                 (request->rq_export ?
1749                  (char *)request->rq_export->exp_client_uuid.uuid : "0"),
1750                 (request->rq_export ?
1751                  cfs_atomic_read(&request->rq_export->exp_refcount) : -99),
1752                 lustre_msg_get_status(request->rq_reqmsg),
1753                 request->rq_xid,
1754                 libcfs_id2str(request->rq_peer),
1755                 lustre_msg_get_opc(request->rq_reqmsg),
1756                 timediff,
1757                 cfs_timeval_sub(&work_end, &request->rq_arrival_time, NULL),
1758                 (request->rq_repmsg ?
1759                  lustre_msg_get_transno(request->rq_repmsg) :
1760                  request->rq_transno),
1761                 request->rq_status,
1762                 (request->rq_repmsg ?
1763                  lustre_msg_get_status(request->rq_repmsg) : -999));
1764         if (likely(svc->srv_stats != NULL && request->rq_reqmsg != NULL)) {
1765                 __u32 op = lustre_msg_get_opc(request->rq_reqmsg);
1766                 int opc = opcode_offset(op);
1767                 if (opc > 0 && !(op == LDLM_ENQUEUE || op == MDS_REINT)) {
1768                         LASSERT(opc < LUSTRE_MAX_OPCODES);
1769                         lprocfs_counter_add(svc->srv_stats,
1770                                             opc + EXTRA_MAX_OPCODES,
1771                                             timediff);
1772                 }
1773         }
1774         if (unlikely(request->rq_early_count)) {
1775                 DEBUG_REQ(D_ADAPTTO, request,
1776                           "sent %d early replies before finishing in "
1777                           CFS_DURATION_T"s",
1778                           request->rq_early_count,
1779                           cfs_time_sub(work_end.tv_sec,
1780                           request->rq_arrival_time.tv_sec));
1781         }
1782
1783 out_req:
1784         ptlrpc_server_finish_request(svc, request);
1785
1786         RETURN(1);
1787 }
1788
1789 /**
1790  * An internal function to process a single reply state object.
1791  */
1792 static int
1793 ptlrpc_handle_rs (struct ptlrpc_reply_state *rs)
1794 {
1795         struct ptlrpc_service     *svc = rs->rs_service;
1796         struct obd_export         *exp;
1797         int                        nlocks;
1798         int                        been_handled;
1799         ENTRY;
1800
1801         exp = rs->rs_export;
1802
1803         LASSERT (rs->rs_difficult);
1804         LASSERT (rs->rs_scheduled);
1805         LASSERT (cfs_list_empty(&rs->rs_list));
1806
1807         cfs_spin_lock (&exp->exp_lock);
1808         /* Noop if removed already */
1809         cfs_list_del_init (&rs->rs_exp_list);
1810         cfs_spin_unlock (&exp->exp_lock);
1811
1812         /* The disk commit callback holds exp_uncommitted_replies_lock while it
1813          * iterates over newly committed replies, removing them from
1814          * exp_uncommitted_replies.  It then drops this lock and schedules the
1815          * replies it found for handling here.
1816          *
1817          * We can avoid contention for exp_uncommitted_replies_lock between the
1818          * HRT threads and further commit callbacks by checking rs_committed
1819          * which is set in the commit callback while it holds both
1820          * rs_lock and exp_uncommitted_reples.
1821          *
1822          * If we see rs_committed clear, the commit callback _may_ not have
1823          * handled this reply yet and we race with it to grab
1824          * exp_uncommitted_replies_lock before removing the reply from
1825          * exp_uncommitted_replies.  Note that if we lose the race and the
1826          * reply has already been removed, list_del_init() is a noop.
1827          *
1828          * If we see rs_committed set, we know the commit callback is handling,
1829          * or has handled this reply since store reordering might allow us to
1830          * see rs_committed set out of sequence.  But since this is done
1831          * holding rs_lock, we can be sure it has all completed once we hold
1832          * rs_lock, which we do right next.
1833          */
1834         if (!rs->rs_committed) {
1835                 cfs_spin_lock(&exp->exp_uncommitted_replies_lock);
1836                 cfs_list_del_init(&rs->rs_obd_list);
1837                 cfs_spin_unlock(&exp->exp_uncommitted_replies_lock);
1838         }
1839
1840         cfs_spin_lock(&rs->rs_lock);
1841
1842         been_handled = rs->rs_handled;
1843         rs->rs_handled = 1;
1844
1845         nlocks = rs->rs_nlocks;                 /* atomic "steal", but */
1846         rs->rs_nlocks = 0;                      /* locks still on rs_locks! */
1847
1848         if (nlocks == 0 && !been_handled) {
1849                 /* If we see this, we should already have seen the warning
1850                  * in mds_steal_ack_locks()  */
1851                 CWARN("All locks stolen from rs %p x"LPD64".t"LPD64
1852                       " o%d NID %s\n",
1853                       rs,
1854                       rs->rs_xid, rs->rs_transno, rs->rs_opc,
1855                       libcfs_nid2str(exp->exp_connection->c_peer.nid));
1856         }
1857
1858         if ((!been_handled && rs->rs_on_net) || nlocks > 0) {
1859                 cfs_spin_unlock(&rs->rs_lock);
1860
1861                 if (!been_handled && rs->rs_on_net) {
1862                         LNetMDUnlink(rs->rs_md_h);
1863                         /* Ignore return code; we're racing with
1864                          * completion... */
1865                 }
1866
1867                 while (nlocks-- > 0)
1868                         ldlm_lock_decref(&rs->rs_locks[nlocks],
1869                                          rs->rs_modes[nlocks]);
1870
1871                 cfs_spin_lock(&rs->rs_lock);
1872         }
1873
1874         rs->rs_scheduled = 0;
1875
1876         if (!rs->rs_on_net) {
1877                 /* Off the net */
1878                 cfs_spin_unlock(&rs->rs_lock);
1879
1880                 class_export_put (exp);
1881                 rs->rs_export = NULL;
1882                 ptlrpc_rs_decref (rs);
1883                 if (cfs_atomic_dec_and_test(&svc->srv_n_difficult_replies) &&
1884                     svc->srv_is_stopping)
1885                         cfs_waitq_broadcast(&svc->srv_waitq);
1886                 RETURN(1);
1887         }
1888
1889         /* still on the net; callback will schedule */
1890         cfs_spin_unlock(&rs->rs_lock);
1891         RETURN(1);
1892 }
1893
1894 #ifndef __KERNEL__
1895
1896 /**
1897  * Check whether given service has a reply available for processing
1898  * and process it.
1899  *
1900  * \param svc a ptlrpc service
1901  * \retval 0 no replies processed
1902  * \retval 1 one reply processed
1903  */
1904 static int
1905 ptlrpc_server_handle_reply(struct ptlrpc_service *svc)
1906 {
1907         struct ptlrpc_reply_state *rs = NULL;
1908         ENTRY;
1909
1910         cfs_spin_lock(&svc->srv_rs_lock);
1911         if (!cfs_list_empty(&svc->srv_reply_queue)) {
1912                 rs = cfs_list_entry(svc->srv_reply_queue.prev,
1913                                     struct ptlrpc_reply_state,
1914                                     rs_list);
1915                 cfs_list_del_init(&rs->rs_list);
1916         }
1917         cfs_spin_unlock(&svc->srv_rs_lock);
1918         if (rs != NULL)
1919                 ptlrpc_handle_rs(rs);
1920         RETURN(rs != NULL);
1921 }
1922
1923 /* FIXME make use of timeout later */
1924 int
1925 liblustre_check_services (void *arg)
1926 {
1927         int  did_something = 0;
1928         int  rc;
1929         cfs_list_t *tmp, *nxt;
1930         ENTRY;
1931
1932         /* I'm relying on being single threaded, not to have to lock
1933          * ptlrpc_all_services etc */
1934         cfs_list_for_each_safe (tmp, nxt, &ptlrpc_all_services) {
1935                 struct ptlrpc_service *svc =
1936                         cfs_list_entry (tmp, struct ptlrpc_service, srv_list);
1937
1938                 if (svc->srv_threads_running != 0)     /* I've recursed */
1939                         continue;
1940
1941                 /* service threads can block for bulk, so this limits us
1942                  * (arbitrarily) to recursing 1 stack frame per service.
1943                  * Note that the problem with recursion is that we have to
1944                  * unwind completely before our caller can resume. */
1945
1946                 svc->srv_threads_running++;
1947
1948                 do {
1949                         rc = ptlrpc_server_handle_req_in(svc);
1950                         rc |= ptlrpc_server_handle_reply(svc);
1951                         rc |= ptlrpc_at_check_timed(svc);
1952                         rc |= ptlrpc_server_handle_request(svc, NULL);
1953                         rc |= (ptlrpc_server_post_idle_rqbds(svc) > 0);
1954                         did_something |= rc;
1955                 } while (rc);
1956
1957                 svc->srv_threads_running--;
1958         }
1959
1960         RETURN(did_something);
1961 }
1962 #define ptlrpc_stop_all_threads(s) do {} while (0)
1963
1964 #else /* __KERNEL__ */
1965
1966 static void
1967 ptlrpc_check_rqbd_pool(struct ptlrpc_service *svc)
1968 {
1969         int avail = svc->srv_nrqbd_receiving;
1970         int low_water = test_req_buffer_pressure ? 0 :
1971                         svc->srv_nbuf_per_group / 2;
1972
1973         /* NB I'm not locking; just looking. */
1974
1975         /* CAVEAT EMPTOR: We might be allocating buffers here because we've
1976          * allowed the request history to grow out of control.  We could put a
1977          * sanity check on that here and cull some history if we need the
1978          * space. */
1979
1980         if (avail <= low_water)
1981                 ptlrpc_grow_req_bufs(svc);
1982
1983         if (svc->srv_stats)
1984                 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQBUF_AVAIL_CNTR,
1985                                     avail);
1986 }
1987
1988 static int
1989 ptlrpc_retry_rqbds(void *arg)
1990 {
1991         struct ptlrpc_service *svc = (struct ptlrpc_service *)arg;
1992
1993         svc->srv_rqbd_timeout = 0;
1994         return (-ETIMEDOUT);
1995 }
1996
1997 static inline int
1998 ptlrpc_threads_enough(struct ptlrpc_service *svc)
1999 {
2000         return svc->srv_n_active_reqs <
2001                svc->srv_threads_running - 1 - (svc->srv_hpreq_handler != NULL);
2002 }
2003
2004 /**
2005  * allowed to create more threads
2006  * user can call it w/o any lock but need to hold ptlrpc_service::srv_lock to
2007  * get reliable result
2008  */
2009 static inline int
2010 ptlrpc_threads_increasable(struct ptlrpc_service *svc)
2011 {
2012         return svc->srv_threads_running +
2013                svc->srv_threads_starting < svc->srv_threads_max;
2014 }
2015
2016 /**
2017  * too many requests and allowed to create more threads
2018  */
2019 static inline int
2020 ptlrpc_threads_need_create(struct ptlrpc_service *svc)
2021 {
2022         return !ptlrpc_threads_enough(svc) && ptlrpc_threads_increasable(svc);
2023 }
2024
2025 static inline int
2026 ptlrpc_thread_stopping(struct ptlrpc_thread *thread)
2027 {
2028         return thread_is_stopping(thread) ||
2029                thread->t_svc->srv_is_stopping;
2030 }
2031
2032 static inline int
2033 ptlrpc_rqbd_pending(struct ptlrpc_service *svc)
2034 {
2035         return !cfs_list_empty(&svc->srv_idle_rqbds) &&
2036                svc->srv_rqbd_timeout == 0;
2037 }
2038
2039 static inline int
2040 ptlrpc_at_check(struct ptlrpc_service *svc)
2041 {
2042         return svc->srv_at_check;
2043 }
2044
2045 /**
2046  * requests wait on preprocessing
2047  * user can call it w/o any lock but need to hold ptlrpc_service::srv_lock to
2048  * get reliable result
2049  */
2050 static inline int
2051 ptlrpc_server_request_waiting(struct ptlrpc_service *svc)
2052 {
2053         return !cfs_list_empty(&svc->srv_req_in_queue);
2054 }
2055
2056 static __attribute__((__noinline__)) int
2057 ptlrpc_wait_event(struct ptlrpc_service *svc,
2058                   struct ptlrpc_thread *thread)
2059 {
2060         /* Don't exit while there are replies to be handled */
2061         struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
2062                                              ptlrpc_retry_rqbds, svc);
2063
2064         lc_watchdog_disable(thread->t_watchdog);
2065
2066         cfs_cond_resched();
2067
2068         l_wait_event_exclusive_head(svc->srv_waitq,
2069                                ptlrpc_thread_stopping(thread) ||
2070                                ptlrpc_server_request_waiting(svc) ||
2071                                ptlrpc_server_request_pending(svc, 0) ||
2072                                ptlrpc_rqbd_pending(svc) ||
2073                                ptlrpc_at_check(svc), &lwi);
2074
2075         if (ptlrpc_thread_stopping(thread))
2076                 return -EINTR;
2077
2078         lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc));
2079
2080         return 0;
2081 }
2082
2083 /**
2084  * Main thread body for service threads.
2085  * Waits in a loop waiting for new requests to process to appear.
2086  * Every time an incoming requests is added to its queue, a waitq
2087  * is woken up and one of the threads will handle it.
2088  */
2089 static int ptlrpc_main(void *arg)
2090 {
2091         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
2092         struct ptlrpc_service  *svc = data->svc;
2093         struct ptlrpc_thread   *thread = data->thread;
2094         struct ptlrpc_reply_state *rs;
2095 #ifdef WITH_GROUP_INFO
2096         cfs_group_info_t *ginfo = NULL;
2097 #endif
2098         struct lu_env *env;
2099         int counter = 0, rc = 0;
2100         ENTRY;
2101
2102         thread->t_pid = cfs_curproc_pid();
2103         cfs_daemonize_ctxt(data->name);
2104
2105 #if defined(HAVE_NODE_TO_CPUMASK) && defined(CONFIG_NUMA)
2106         /* we need to do this before any per-thread allocation is done so that
2107          * we get the per-thread allocations on local node.  bug 7342 */
2108         if (svc->srv_cpu_affinity) {
2109                 int cpu, num_cpu;
2110
2111                 for (cpu = 0, num_cpu = 0; cpu < cfs_num_possible_cpus();
2112                      cpu++) {
2113                         if (!cpu_online(cpu))
2114                                 continue;
2115                         if (num_cpu == thread->t_id % cfs_num_online_cpus())
2116                                 break;
2117                         num_cpu++;
2118                 }
2119                 cfs_set_cpus_allowed(cfs_current(),
2120                                      node_to_cpumask(cpu_to_node(cpu)));
2121         }
2122 #endif
2123
2124 #ifdef WITH_GROUP_INFO
2125         ginfo = cfs_groups_alloc(0);
2126         if (!ginfo) {
2127                 rc = -ENOMEM;
2128                 goto out;
2129         }
2130
2131         cfs_set_current_groups(ginfo);
2132         cfs_put_group_info(ginfo);
2133 #endif
2134
2135         if (svc->srv_init != NULL) {
2136                 rc = svc->srv_init(thread);
2137                 if (rc)
2138                         goto out;
2139         }
2140
2141         OBD_ALLOC_PTR(env);
2142         if (env == NULL) {
2143                 rc = -ENOMEM;
2144                 goto out_srv_fini;
2145         }
2146
2147         rc = lu_context_init(&env->le_ctx,
2148                              svc->srv_ctx_tags|LCT_REMEMBER|LCT_NOREF);
2149         if (rc)
2150                 goto out_srv_fini;
2151
2152         thread->t_env = env;
2153         env->le_ctx.lc_thread = thread;
2154         env->le_ctx.lc_cookie = 0x6;
2155
2156         /* Alloc reply state structure for this one */
2157         OBD_ALLOC_LARGE(rs, svc->srv_max_reply_size);
2158         if (!rs) {
2159                 rc = -ENOMEM;
2160                 goto out_srv_fini;
2161         }
2162
2163         cfs_spin_lock(&svc->srv_lock);
2164
2165         LASSERT(thread_is_starting(thread));
2166         thread_clear_flags(thread, SVC_STARTING);
2167         svc->srv_threads_starting--;
2168
2169         /* SVC_STOPPING may already be set here if someone else is trying
2170          * to stop the service while this new thread has been dynamically
2171          * forked. We still set SVC_RUNNING to let our creator know that
2172          * we are now running, however we will exit as soon as possible */
2173         thread_add_flags(thread, SVC_RUNNING);
2174         svc->srv_threads_running++;
2175         cfs_spin_unlock(&svc->srv_lock);
2176
2177         /*
2178          * wake up our creator. Note: @data is invalid after this point,
2179          * because it's allocated on ptlrpc_start_thread() stack.
2180          */
2181         cfs_waitq_signal(&thread->t_ctl_waitq);
2182
2183         thread->t_watchdog = lc_watchdog_add(CFS_GET_TIMEOUT(svc), NULL, NULL);
2184
2185         cfs_spin_lock(&svc->srv_rs_lock);
2186         cfs_list_add(&rs->rs_list, &svc->srv_free_rs_list);
2187         cfs_waitq_signal(&svc->srv_free_rs_waitq);
2188         cfs_spin_unlock(&svc->srv_rs_lock);
2189
2190         CDEBUG(D_NET, "service thread %d (#%d) started\n", thread->t_id,
2191                svc->srv_threads_running);
2192
2193         /* XXX maintain a list of all managed devices: insert here */
2194         while (!ptlrpc_thread_stopping(thread)) {
2195                 if (ptlrpc_wait_event(svc, thread))
2196                         break;
2197
2198                 ptlrpc_check_rqbd_pool(svc);
2199
2200                 if (ptlrpc_threads_need_create(svc)) {
2201                         /* Ignore return code - we tried... */
2202                         ptlrpc_start_thread(svc);
2203                 }
2204
2205                 /* Process all incoming reqs before handling any */
2206                 if (ptlrpc_server_request_waiting(svc)) {
2207                         ptlrpc_server_handle_req_in(svc);
2208                         /* but limit ourselves in case of flood */
2209                         if (counter++ < 100)
2210                                 continue;
2211                         counter = 0;
2212                 }
2213
2214                 if (ptlrpc_at_check(svc))
2215                         ptlrpc_at_check_timed(svc);
2216
2217                 if (ptlrpc_server_request_pending(svc, 0)) {
2218                         lu_context_enter(&env->le_ctx);
2219                         ptlrpc_server_handle_request(svc, thread);
2220                         lu_context_exit(&env->le_ctx);
2221                 }
2222
2223                 if (ptlrpc_rqbd_pending(svc) &&
2224                     ptlrpc_server_post_idle_rqbds(svc) < 0) {
2225                         /* I just failed to repost request buffers.
2226                          * Wait for a timeout (unless something else
2227                          * happens) before I try again */
2228                         svc->srv_rqbd_timeout = cfs_time_seconds(1)/10;
2229                         CDEBUG(D_RPCTRACE,"Posted buffers: %d\n",
2230                                svc->srv_nrqbd_receiving);
2231                 }
2232         }
2233
2234         lc_watchdog_delete(thread->t_watchdog);
2235         thread->t_watchdog = NULL;
2236
2237 out_srv_fini:
2238         /*
2239          * deconstruct service specific state created by ptlrpc_start_thread()
2240          */
2241         if (svc->srv_done != NULL)
2242                 svc->srv_done(thread);
2243
2244         if (env != NULL) {
2245                 lu_context_fini(&env->le_ctx);
2246                 OBD_FREE_PTR(env);
2247         }
2248 out:
2249         CDEBUG(D_RPCTRACE, "service thread [ %p : %u ] %d exiting: rc %d\n",
2250                thread, thread->t_pid, thread->t_id, rc);
2251
2252         cfs_spin_lock(&svc->srv_lock);
2253         if (thread_test_and_clear_flags(thread, SVC_STARTING))
2254                 svc->srv_threads_starting--;
2255
2256         if (thread_test_and_clear_flags(thread, SVC_RUNNING))
2257                 /* must know immediately */
2258                 svc->srv_threads_running--;
2259
2260         thread->t_id    = rc;
2261         thread_add_flags(thread, SVC_STOPPED);
2262
2263         cfs_waitq_signal(&thread->t_ctl_waitq);
2264         cfs_spin_unlock(&svc->srv_lock);
2265
2266         return rc;
2267 }
2268
2269 struct ptlrpc_hr_args {
2270         int                       thread_index;
2271         int                       cpu_index;
2272         struct ptlrpc_hr_service *hrs;
2273 };
2274
2275 static int hrt_dont_sleep(struct ptlrpc_hr_thread *t,
2276                           cfs_list_t *replies)
2277 {
2278         int result;
2279
2280         cfs_spin_lock(&t->hrt_lock);
2281         cfs_list_splice_init(&t->hrt_queue, replies);
2282         result = cfs_test_bit(HRT_STOPPING, &t->hrt_flags) ||
2283                 !cfs_list_empty(replies);
2284         cfs_spin_unlock(&t->hrt_lock);
2285         return result;
2286 }
2287
2288 /**
2289  * Main body of "handle reply" function.
2290  * It processes acked reply states
2291  */
2292 static int ptlrpc_hr_main(void *arg)
2293 {
2294         struct ptlrpc_hr_args * hr_args = arg;
2295         struct ptlrpc_hr_service *hr = hr_args->hrs;
2296         struct ptlrpc_hr_thread *t = &hr->hr_threads[hr_args->thread_index];
2297         char threadname[20];
2298         CFS_LIST_HEAD(replies);
2299
2300         snprintf(threadname, sizeof(threadname),
2301                  "ptlrpc_hr_%d", hr_args->thread_index);
2302
2303         cfs_daemonize_ctxt(threadname);
2304 #if defined(CONFIG_NUMA) && defined(HAVE_NODE_TO_CPUMASK)
2305         cfs_set_cpus_allowed(cfs_current(),
2306                              node_to_cpumask(cpu_to_node(hr_args->cpu_index)));
2307 #endif
2308         cfs_set_bit(HRT_RUNNING, &t->hrt_flags);
2309         cfs_waitq_signal(&t->hrt_wait);
2310
2311         while (!cfs_test_bit(HRT_STOPPING, &t->hrt_flags)) {
2312
2313                 l_wait_condition(t->hrt_wait, hrt_dont_sleep(t, &replies));
2314                 while (!cfs_list_empty(&replies)) {
2315                         struct ptlrpc_reply_state *rs;
2316
2317                         rs = cfs_list_entry(replies.prev,
2318                                             struct ptlrpc_reply_state,
2319                                             rs_list);
2320                         cfs_list_del_init(&rs->rs_list);
2321                         ptlrpc_handle_rs(rs);
2322                 }
2323         }
2324
2325         cfs_clear_bit(HRT_RUNNING, &t->hrt_flags);
2326         cfs_complete(&t->hrt_completion);
2327
2328         return 0;
2329 }
2330
2331 static int ptlrpc_start_hr_thread(struct ptlrpc_hr_service *hr, int n, int cpu)
2332 {
2333         struct ptlrpc_hr_thread *t = &hr->hr_threads[n];
2334         struct ptlrpc_hr_args args;
2335         int rc;
2336         ENTRY;
2337
2338         args.thread_index = n;
2339         args.cpu_index = cpu;
2340         args.hrs = hr;
2341
2342         rc = cfs_create_thread(ptlrpc_hr_main, (void*)&args, CFS_DAEMON_FLAGS);
2343         if (rc < 0) {
2344                 cfs_complete(&t->hrt_completion);
2345                 GOTO(out, rc);
2346         }
2347         l_wait_condition(t->hrt_wait, cfs_test_bit(HRT_RUNNING, &t->hrt_flags));
2348         RETURN(0);
2349  out:
2350         return rc;
2351 }
2352
2353 static void ptlrpc_stop_hr_thread(struct ptlrpc_hr_thread *t)
2354 {
2355         ENTRY;
2356
2357         cfs_set_bit(HRT_STOPPING, &t->hrt_flags);
2358         cfs_waitq_signal(&t->hrt_wait);
2359         cfs_wait_for_completion(&t->hrt_completion);
2360
2361         EXIT;
2362 }
2363
2364 static void ptlrpc_stop_hr_threads(struct ptlrpc_hr_service *hrs)
2365 {
2366         int n;
2367         ENTRY;
2368
2369         for (n = 0; n < hrs->hr_n_threads; n++)
2370                 ptlrpc_stop_hr_thread(&hrs->hr_threads[n]);
2371
2372         EXIT;
2373 }
2374
2375 static int ptlrpc_start_hr_threads(struct ptlrpc_hr_service *hr)
2376 {
2377         int rc = -ENOMEM;
2378         int n, cpu, threads_started = 0;
2379         ENTRY;
2380
2381         LASSERT(hr != NULL);
2382         LASSERT(hr->hr_n_threads > 0);
2383
2384         for (n = 0, cpu = 0; n < hr->hr_n_threads; n++) {
2385 #if defined(CONFIG_SMP) && defined(HAVE_NODE_TO_CPUMASK)
2386                 while (!cpu_online(cpu)) {
2387                         cpu++;
2388                         if (cpu >= cfs_num_possible_cpus())
2389                                 cpu = 0;
2390                 }
2391 #endif
2392                 rc = ptlrpc_start_hr_thread(hr, n, cpu);
2393                 if (rc != 0)
2394                         break;
2395                 threads_started++;
2396                 cpu++;
2397         }
2398         if (threads_started == 0) {
2399                 CERROR("No reply handling threads started\n");
2400                 RETURN(-ESRCH);
2401         }
2402         if (threads_started < hr->hr_n_threads) {
2403                 CWARN("Started only %d reply handling threads from %d\n",
2404                       threads_started, hr->hr_n_threads);
2405                 hr->hr_n_threads = threads_started;
2406         }
2407         RETURN(0);
2408 }
2409
2410 static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
2411                                struct ptlrpc_thread *thread)
2412 {
2413         struct l_wait_info lwi = { 0 };
2414         ENTRY;
2415
2416         CDEBUG(D_RPCTRACE, "Stopping thread [ %p : %u ]\n",
2417                thread, thread->t_pid);
2418
2419         cfs_spin_lock(&svc->srv_lock);
2420         /* let the thread know that we would like it to stop asap */
2421         thread_add_flags(thread, SVC_STOPPING);
2422         cfs_spin_unlock(&svc->srv_lock);
2423
2424         cfs_waitq_broadcast(&svc->srv_waitq);
2425         l_wait_event(thread->t_ctl_waitq,
2426                      thread_is_stopped(thread), &lwi);
2427
2428         cfs_spin_lock(&svc->srv_lock);
2429         cfs_list_del(&thread->t_link);
2430         cfs_spin_unlock(&svc->srv_lock);
2431
2432         OBD_FREE_PTR(thread);
2433         EXIT;
2434 }
2435
2436 /**
2437  * Stops all threads of a particular service \a svc
2438  */
2439 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
2440 {
2441         struct ptlrpc_thread *thread;
2442         ENTRY;
2443
2444         cfs_spin_lock(&svc->srv_lock);
2445         while (!cfs_list_empty(&svc->srv_threads)) {
2446                 thread = cfs_list_entry(svc->srv_threads.next,
2447                                         struct ptlrpc_thread, t_link);
2448
2449                 cfs_spin_unlock(&svc->srv_lock);
2450                 ptlrpc_stop_thread(svc, thread);
2451                 cfs_spin_lock(&svc->srv_lock);
2452         }
2453
2454         cfs_spin_unlock(&svc->srv_lock);
2455         EXIT;
2456 }
2457
2458 int ptlrpc_start_threads(struct ptlrpc_service *svc)
2459 {
2460         int i, rc = 0;
2461         ENTRY;
2462
2463         /* We require 2 threads min - see note in
2464            ptlrpc_server_handle_request */
2465         LASSERT(svc->srv_threads_min >= 2);
2466         for (i = 0; i < svc->srv_threads_min; i++) {
2467                 rc = ptlrpc_start_thread(svc);
2468                 /* We have enough threads, don't start more.  b=15759 */
2469                 if (rc == -EMFILE) {
2470                         rc = 0;
2471                         break;
2472                 }
2473                 if (rc) {
2474                         CERROR("cannot start %s thread #%d: rc %d\n",
2475                                svc->srv_thread_name, i, rc);
2476                         ptlrpc_stop_all_threads(svc);
2477                         break;
2478                 }
2479         }
2480         RETURN(rc);
2481 }
2482
2483 int ptlrpc_start_thread(struct ptlrpc_service *svc)
2484 {
2485         struct l_wait_info lwi = { 0 };
2486         struct ptlrpc_svc_data d;
2487         struct ptlrpc_thread *thread;
2488         char name[32];
2489         int rc;
2490         ENTRY;
2491
2492         CDEBUG(D_RPCTRACE, "%s started %d min %d max %d running %d\n",
2493                svc->srv_name, svc->srv_threads_running, svc->srv_threads_min,
2494                svc->srv_threads_max, svc->srv_threads_running);
2495
2496         if (unlikely(svc->srv_is_stopping))
2497                 RETURN(-ESRCH);
2498
2499         if (!ptlrpc_threads_increasable(svc) ||
2500             (OBD_FAIL_CHECK(OBD_FAIL_TGT_TOOMANY_THREADS) &&
2501              svc->srv_threads_running == svc->srv_threads_min - 1))
2502                 RETURN(-EMFILE);
2503
2504         OBD_ALLOC_PTR(thread);
2505         if (thread == NULL)
2506                 RETURN(-ENOMEM);
2507         cfs_waitq_init(&thread->t_ctl_waitq);
2508
2509         cfs_spin_lock(&svc->srv_lock);
2510         if (!ptlrpc_threads_increasable(svc)) {
2511                 cfs_spin_unlock(&svc->srv_lock);
2512                 OBD_FREE_PTR(thread);
2513                 RETURN(-EMFILE);
2514         }
2515
2516         svc->srv_threads_starting++;
2517         thread->t_id    = svc->srv_threads_next_id++;
2518         thread_add_flags(thread, SVC_STARTING);
2519         thread->t_svc   = svc;
2520
2521         cfs_list_add(&thread->t_link, &svc->srv_threads);
2522         cfs_spin_unlock(&svc->srv_lock);
2523
2524         sprintf(name, "%s_%02d", svc->srv_thread_name, thread->t_id);
2525         d.svc = svc;
2526         d.name = name;
2527         d.thread = thread;
2528
2529         CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name);
2530
2531         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
2532          * just drop the VM and FILES in cfs_daemonize_ctxt() right away.
2533          */
2534         rc = cfs_create_thread(ptlrpc_main, &d, CFS_DAEMON_FLAGS);
2535         if (rc < 0) {
2536                 CERROR("cannot start thread '%s': rc %d\n", name, rc);
2537
2538                 cfs_spin_lock(&svc->srv_lock);
2539                 cfs_list_del(&thread->t_link);
2540                 --svc->srv_threads_starting;
2541                 cfs_spin_unlock(&svc->srv_lock);
2542
2543                 OBD_FREE(thread, sizeof(*thread));
2544                 RETURN(rc);
2545         }
2546         l_wait_event(thread->t_ctl_waitq,
2547                      thread_is_running(thread) || thread_is_stopped(thread),
2548                      &lwi);
2549
2550         rc = thread_is_stopped(thread) ? thread->t_id : 0;
2551         RETURN(rc);
2552 }
2553
2554
2555 int ptlrpc_hr_init(void)
2556 {
2557         int i;
2558         int n_cpus = cfs_num_online_cpus();
2559         struct ptlrpc_hr_service *hr;
2560         int size;
2561         int rc;
2562         ENTRY;
2563
2564         LASSERT(ptlrpc_hr == NULL);
2565
2566         size = offsetof(struct ptlrpc_hr_service, hr_threads[n_cpus]);
2567         OBD_ALLOC(hr, size);
2568         if (hr == NULL)
2569                 RETURN(-ENOMEM);
2570         for (i = 0; i < n_cpus; i++) {
2571                 struct ptlrpc_hr_thread *t = &hr->hr_threads[i];
2572
2573                 cfs_spin_lock_init(&t->hrt_lock);
2574                 cfs_waitq_init(&t->hrt_wait);
2575                 CFS_INIT_LIST_HEAD(&t->hrt_queue);
2576                 cfs_init_completion(&t->hrt_completion);
2577         }
2578         hr->hr_n_threads = n_cpus;
2579         hr->hr_size = size;
2580         ptlrpc_hr = hr;
2581
2582         rc = ptlrpc_start_hr_threads(hr);
2583         if (rc) {
2584                 OBD_FREE(hr, hr->hr_size);
2585                 ptlrpc_hr = NULL;
2586         }
2587         RETURN(rc);
2588 }
2589
2590 void ptlrpc_hr_fini(void)
2591 {
2592         if (ptlrpc_hr != NULL) {
2593                 ptlrpc_stop_hr_threads(ptlrpc_hr);
2594                 OBD_FREE(ptlrpc_hr, ptlrpc_hr->hr_size);
2595                 ptlrpc_hr = NULL;
2596         }
2597 }
2598
2599 #endif /* __KERNEL__ */
2600
2601 /**
2602  * Wait until all already scheduled replies are processed.
2603  */
2604 static void ptlrpc_wait_replies(struct ptlrpc_service *svc)
2605 {
2606         while (1) {
2607                 int rc;
2608                 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(10),
2609                                                      NULL, NULL);
2610                 rc = l_wait_event(svc->srv_waitq, cfs_atomic_read(&svc-> \
2611                                   srv_n_difficult_replies) == 0,
2612                                   &lwi);
2613                 if (rc == 0)
2614                         break;
2615                 CWARN("Unexpectedly long timeout %p\n", svc);
2616         }
2617 }
2618
2619 int ptlrpc_unregister_service(struct ptlrpc_service *service)
2620 {
2621         int                   rc;
2622         struct l_wait_info    lwi;
2623         cfs_list_t           *tmp;
2624         struct ptlrpc_reply_state *rs, *t;
2625         struct ptlrpc_at_array *array = &service->srv_at_array;
2626         ENTRY;
2627
2628         service->srv_is_stopping = 1;
2629         cfs_timer_disarm(&service->srv_at_timer);
2630
2631         ptlrpc_stop_all_threads(service);
2632         LASSERT(cfs_list_empty(&service->srv_threads));
2633
2634         cfs_spin_lock (&ptlrpc_all_services_lock);
2635         cfs_list_del_init (&service->srv_list);
2636         cfs_spin_unlock (&ptlrpc_all_services_lock);
2637
2638         ptlrpc_lprocfs_unregister_service(service);
2639
2640         /* All history will be culled when the next request buffer is
2641          * freed */
2642         service->srv_max_history_rqbds = 0;
2643
2644         CDEBUG(D_NET, "%s: tearing down\n", service->srv_name);
2645
2646         rc = LNetClearLazyPortal(service->srv_req_portal);
2647         LASSERT (rc == 0);
2648
2649         /* Unlink all the request buffers.  This forces a 'final' event with
2650          * its 'unlink' flag set for each posted rqbd */
2651         cfs_list_for_each(tmp, &service->srv_active_rqbds) {
2652                 struct ptlrpc_request_buffer_desc *rqbd =
2653                         cfs_list_entry(tmp, struct ptlrpc_request_buffer_desc,
2654                                        rqbd_list);
2655
2656                 rc = LNetMDUnlink(rqbd->rqbd_md_h);
2657                 LASSERT (rc == 0 || rc == -ENOENT);
2658         }
2659
2660         /* Wait for the network to release any buffers it's currently
2661          * filling */
2662         for (;;) {
2663                 cfs_spin_lock(&service->srv_lock);
2664                 rc = service->srv_nrqbd_receiving;
2665                 cfs_spin_unlock(&service->srv_lock);
2666
2667                 if (rc == 0)
2668                         break;
2669
2670                 /* Network access will complete in finite time but the HUGE
2671                  * timeout lets us CWARN for visibility of sluggish NALs */
2672                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
2673                                            cfs_time_seconds(1), NULL, NULL);
2674                 rc = l_wait_event(service->srv_waitq,
2675                                   service->srv_nrqbd_receiving == 0,
2676                                   &lwi);
2677                 if (rc == -ETIMEDOUT)
2678                         CWARN("Service %s waiting for request buffers\n",
2679                               service->srv_name);
2680         }
2681
2682         /* schedule all outstanding replies to terminate them */
2683         cfs_spin_lock(&service->srv_rs_lock);
2684         while (!cfs_list_empty(&service->srv_active_replies)) {
2685                 struct ptlrpc_reply_state *rs =
2686                         cfs_list_entry(service->srv_active_replies.next,
2687                                        struct ptlrpc_reply_state, rs_list);
2688                 cfs_spin_lock(&rs->rs_lock);
2689                 ptlrpc_schedule_difficult_reply(rs);
2690                 cfs_spin_unlock(&rs->rs_lock);
2691         }
2692         cfs_spin_unlock(&service->srv_rs_lock);
2693
2694         /* purge the request queue.  NB No new replies (rqbds all unlinked)
2695          * and no service threads, so I'm the only thread noodling the
2696          * request queue now */
2697         while (!cfs_list_empty(&service->srv_req_in_queue)) {
2698                 struct ptlrpc_request *req =
2699                         cfs_list_entry(service->srv_req_in_queue.next,
2700                                        struct ptlrpc_request,
2701                                        rq_list);
2702
2703                 cfs_list_del(&req->rq_list);
2704                 service->srv_n_queued_reqs--;
2705                 service->srv_n_active_reqs++;
2706                 ptlrpc_server_finish_request(service, req);
2707         }
2708         while (ptlrpc_server_request_pending(service, 1)) {
2709                 struct ptlrpc_request *req;
2710
2711                 req = ptlrpc_server_request_get(service, 1);
2712                 cfs_list_del(&req->rq_list);
2713                 service->srv_n_active_reqs++;
2714                 ptlrpc_server_finish_request(service, req);
2715         }
2716         LASSERT(service->srv_n_queued_reqs == 0);
2717         LASSERT(service->srv_n_active_reqs == 0);
2718         LASSERT(service->srv_n_history_rqbds == 0);
2719         LASSERT(cfs_list_empty(&service->srv_active_rqbds));
2720
2721         /* Now free all the request buffers since nothing references them
2722          * any more... */
2723         while (!cfs_list_empty(&service->srv_idle_rqbds)) {
2724                 struct ptlrpc_request_buffer_desc *rqbd =
2725                         cfs_list_entry(service->srv_idle_rqbds.next,
2726                                        struct ptlrpc_request_buffer_desc,
2727                                        rqbd_list);
2728
2729                 ptlrpc_free_rqbd(rqbd);
2730         }
2731
2732         ptlrpc_wait_replies(service);
2733
2734         cfs_list_for_each_entry_safe(rs, t, &service->srv_free_rs_list,
2735                                      rs_list) {
2736                 cfs_list_del(&rs->rs_list);
2737                 OBD_FREE_LARGE(rs, service->srv_max_reply_size);
2738         }
2739
2740         /* In case somebody rearmed this in the meantime */
2741         cfs_timer_disarm(&service->srv_at_timer);
2742
2743         if (array->paa_reqs_array != NULL) {
2744                 OBD_FREE(array->paa_reqs_array,
2745                          sizeof(cfs_list_t) * array->paa_size);
2746                 array->paa_reqs_array = NULL;
2747         }
2748
2749         if (array->paa_reqs_count != NULL) {
2750                 OBD_FREE(array->paa_reqs_count,
2751                          sizeof(__u32) * array->paa_size);
2752                 array->paa_reqs_count= NULL;
2753         }
2754
2755         OBD_FREE_PTR(service);
2756         RETURN(0);
2757 }
2758
2759 /**
2760  * Returns 0 if the service is healthy.
2761  *
2762  * Right now, it just checks to make sure that requests aren't languishing
2763  * in the queue.  We'll use this health check to govern whether a node needs
2764  * to be shot, so it's intentionally non-aggressive. */
2765 int ptlrpc_service_health_check(struct ptlrpc_service *svc)
2766 {
2767         struct ptlrpc_request *request;
2768         struct timeval         right_now;
2769         long                   timediff;
2770
2771         if (svc == NULL)
2772                 return 0;
2773
2774         cfs_gettimeofday(&right_now);
2775
2776         cfs_spin_lock(&svc->srv_rq_lock);
2777         if (!ptlrpc_server_request_pending(svc, 1)) {
2778                 cfs_spin_unlock(&svc->srv_rq_lock);
2779                 return 0;
2780         }
2781
2782         /* How long has the next entry been waiting? */
2783         if (cfs_list_empty(&svc->srv_request_queue))
2784                 request = cfs_list_entry(svc->srv_request_hpq.next,
2785                                          struct ptlrpc_request, rq_list);
2786         else
2787                 request = cfs_list_entry(svc->srv_request_queue.next,
2788                                          struct ptlrpc_request, rq_list);
2789         timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
2790         cfs_spin_unlock(&svc->srv_rq_lock);
2791
2792         if ((timediff / ONE_MILLION) > (AT_OFF ? obd_timeout * 3/2 :
2793                                         at_max)) {
2794                 CERROR("%s: unhealthy - request has been waiting %lds\n",
2795                        svc->srv_name, timediff / ONE_MILLION);
2796                 return (-1);
2797         }
2798
2799         return 0;
2800 }