Whamcloud - gitweb
90220cea3b92fe8de363a7b263b578792d059ab1
[fs/lustre-release.git] / lustre / ptlrpc / ptlrpcd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/ptlrpcd.c
37  */
38
39 /** \defgroup ptlrpcd PortalRPC daemon
40  *
41  * ptlrpcd is a special thread with its own set where other user might add
42  * requests when they don't want to wait for their completion.
43  * PtlRPCD will take care of sending such requests and then processing their
44  * replies and calling completion callbacks as necessary.
45  * The callbacks are called directly from ptlrpcd context.
46  * It is important to never significantly block (esp. on RPCs!) within such
47  * completion handler or a deadlock might occur where ptlrpcd enters some
48  * callback that attempts to send another RPC and wait for it to return,
49  * during which time ptlrpcd is completely blocked, so e.g. if import
50  * fails, recovery cannot progress because connection requests are also
51  * sent by ptlrpcd.
52  *
53  * @{
54  */
55
56 #define DEBUG_SUBSYSTEM S_RPC
57
58 #ifdef __KERNEL__
59 # include <libcfs/libcfs.h>
60 #else /* __KERNEL__ */
61 # include <liblustre.h>
62 # include <ctype.h>
63 #endif
64
65 #include <lustre_net.h>
66 # include <lustre_lib.h>
67
68 #include <lustre_ha.h>
69 #include <obd_class.h>   /* for obd_zombie */
70 #include <obd_support.h> /* for OBD_FAIL_CHECK */
71 #include <cl_object.h> /* cl_env_{get,put}() */
72 #include <lprocfs_status.h>
73
74 enum pscope_thread {
75         PT_NORMAL,
76         PT_RECOVERY,
77         PT_NR
78 };
79
80 struct ptlrpcd_scope_ctl {
81         struct ptlrpcd_thread {
82                 const char        *pt_name;
83                 struct ptlrpcd_ctl pt_ctl;
84         } pscope_thread[PT_NR];
85 };
86
87 static struct ptlrpcd_scope_ctl ptlrpcd_scopes[PSCOPE_NR] = {
88         [PSCOPE_BRW] = {
89                 .pscope_thread = {
90                         [PT_NORMAL] = {
91                                 .pt_name = "ptlrpcd-brw"
92                         },
93                         [PT_RECOVERY] = {
94                                 .pt_name = "ptlrpcd-brw-rcv"
95                         }
96                 }
97         },
98         [PSCOPE_OTHER] = {
99                 .pscope_thread = {
100                         [PT_NORMAL] = {
101                                 .pt_name = "ptlrpcd"
102                         },
103                         [PT_RECOVERY] = {
104                                 .pt_name = "ptlrpcd-rcv"
105                         }
106                 }
107         }
108 };
109
110 cfs_semaphore_t ptlrpcd_sem;
111 static int ptlrpcd_users = 0;
112
113 void ptlrpcd_wake(struct ptlrpc_request *req)
114 {
115         struct ptlrpc_request_set *rq_set = req->rq_set;
116
117         LASSERT(rq_set != NULL);
118
119         cfs_waitq_signal(&rq_set->set_waitq);
120 }
121
122 /**
123  * Move all request from an existing request set to the ptlrpcd queue.
124  * All requests from the set must be in phase RQ_PHASE_NEW.
125  */
126 void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
127 {
128         cfs_list_t *tmp, *pos;
129
130         cfs_list_for_each_safe(pos, tmp, &set->set_requests) {
131                 struct ptlrpc_request *req =
132                         cfs_list_entry(pos, struct ptlrpc_request,
133                                        rq_set_chain);
134
135                 LASSERT(req->rq_phase == RQ_PHASE_NEW);
136                 cfs_list_del_init(&req->rq_set_chain);
137                 req->rq_set = NULL;
138                 ptlrpcd_add_req(req, PSCOPE_OTHER);
139                 cfs_atomic_dec(&set->set_remaining);
140         }
141         LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
142 }
143 EXPORT_SYMBOL(ptlrpcd_add_rqset);
144
145 /**
146  * Requests that are added to the ptlrpcd queue are sent via
147  * ptlrpcd_check->ptlrpc_check_set().
148  */
149 int ptlrpcd_add_req(struct ptlrpc_request *req, enum ptlrpcd_scope scope)
150 {
151         struct ptlrpcd_ctl *pc;
152         enum pscope_thread  pt;
153         int rc;
154
155         LASSERT(scope < PSCOPE_NR);
156         
157         cfs_spin_lock(&req->rq_lock);
158         if (req->rq_invalid_rqset) {
159                 cfs_duration_t timeout;
160                 struct l_wait_info lwi;
161
162                 req->rq_invalid_rqset = 0;
163                 cfs_spin_unlock(&req->rq_lock);
164
165                 timeout = cfs_time_seconds(5);
166                 lwi = LWI_TIMEOUT(timeout, back_to_sleep, NULL);
167                 l_wait_event(req->rq_set_waitq, (req->rq_set == NULL), &lwi);
168         } else if (req->rq_set) {
169                 LASSERT(req->rq_phase == RQ_PHASE_NEW);
170                 LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY);
171
172                 /* ptlrpc_check_set will decrease the count */
173                 cfs_atomic_inc(&req->rq_set->set_remaining);
174                 cfs_spin_unlock(&req->rq_lock);
175
176                 cfs_waitq_signal(&req->rq_set->set_waitq);
177         } else {
178                 cfs_spin_unlock(&req->rq_lock);
179         }
180
181         pt = req->rq_send_state == LUSTRE_IMP_FULL ? PT_NORMAL : PT_RECOVERY;
182         pc = &ptlrpcd_scopes[scope].pscope_thread[pt].pt_ctl;
183         rc = ptlrpc_set_add_new_req(pc, req);
184         /*
185          * XXX disable this for CLIO: environment is needed for interpreter.
186          *     add debug temporary to check rc.
187          */
188         LASSERTF(rc == 0, "ptlrpcd_add_req failed (rc = %d)\n", rc);
189         if (rc && 0) {
190                 /*
191                  * Thread is probably in stop now so we need to
192                  * kill this rpc as it was not added. Let's call
193                  * interpret for it to let know we're killing it
194                  * so that higher levels might free associated
195                  * resources.
196                  */
197                 ptlrpc_req_interpret(NULL, req, -EBADR);
198                 req->rq_set = NULL;
199                 ptlrpc_req_finished(req);
200         } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING) {
201                 /*
202                  * The request is for recovery, should be sent ASAP.
203                  */
204                 cfs_waitq_signal(&pc->pc_set->set_waitq);
205         }
206
207         return rc;
208 }
209
210 /**
211  * Check if there is more work to do on ptlrpcd set.
212  * Returns 1 if yes.
213  */
214 static int ptlrpcd_check(const struct lu_env *env, struct ptlrpcd_ctl *pc)
215 {
216         cfs_list_t *tmp, *pos;
217         struct ptlrpc_request *req;
218         int rc = 0;
219         ENTRY;
220
221         cfs_spin_lock(&pc->pc_set->set_new_req_lock);
222         cfs_list_for_each_safe(pos, tmp, &pc->pc_set->set_new_requests) {
223                 req = cfs_list_entry(pos, struct ptlrpc_request, rq_set_chain);
224                 cfs_list_del_init(&req->rq_set_chain);
225                 ptlrpc_set_add_req(pc->pc_set, req);
226                 /*
227                  * Need to calculate its timeout.
228                  */
229                 rc = 1;
230         }
231         cfs_spin_unlock(&pc->pc_set->set_new_req_lock);
232
233         if (cfs_atomic_read(&pc->pc_set->set_remaining)) {
234                 rc = rc | ptlrpc_check_set(env, pc->pc_set);
235
236                 /*
237                  * XXX: our set never completes, so we prune the completed
238                  * reqs after each iteration. boy could this be smarter.
239                  */
240                 cfs_list_for_each_safe(pos, tmp, &pc->pc_set->set_requests) {
241                         req = cfs_list_entry(pos, struct ptlrpc_request,
242                                          rq_set_chain);
243                         if (req->rq_phase != RQ_PHASE_COMPLETE)
244                                 continue;
245
246                         cfs_list_del_init(&req->rq_set_chain);
247                         req->rq_set = NULL;
248                         ptlrpc_req_finished (req);
249                 }
250         }
251
252         if (rc == 0) {
253                 /*
254                  * If new requests have been added, make sure to wake up.
255                  */
256                 cfs_spin_lock(&pc->pc_set->set_new_req_lock);
257                 rc = !cfs_list_empty(&pc->pc_set->set_new_requests);
258                 cfs_spin_unlock(&pc->pc_set->set_new_req_lock);
259         }
260
261         RETURN(rc);
262 }
263
264 #ifdef __KERNEL__
265 /**
266  * Main ptlrpcd thread.
267  * ptlrpc's code paths like to execute in process context, so we have this
268  * thread which spins on a set which contains the rpcs and sends them.
269  *
270  */
271 static int ptlrpcd(void *arg)
272 {
273         struct ptlrpcd_ctl *pc = arg;
274         struct lu_env env = { .le_ses = NULL };
275         int rc, exit = 0;
276         ENTRY;
277
278         rc = cfs_daemonize_ctxt(pc->pc_name);
279         if (rc == 0) {
280                 /*
281                  * XXX So far only "client" ptlrpcd uses an environment. In
282                  * the future, ptlrpcd thread (or a thread-set) has to given
283                  * an argument, describing its "scope".
284                  */
285                 rc = lu_context_init(&env.le_ctx,
286                                      LCT_CL_THREAD|LCT_REMEMBER|LCT_NOREF);
287         }
288
289         cfs_complete(&pc->pc_starting);
290
291         if (rc != 0)
292                 RETURN(rc);
293         env.le_ctx.lc_cookie = 0x7;
294
295         /*
296          * This mainloop strongly resembles ptlrpc_set_wait() except that our
297          * set never completes.  ptlrpcd_check() calls ptlrpc_check_set() when
298          * there are requests in the set. New requests come in on the set's
299          * new_req_list and ptlrpcd_check() moves them into the set.
300          */
301         do {
302                 struct l_wait_info lwi;
303                 int timeout;
304
305                 rc = lu_env_refill(&env);
306                 if (rc != 0) {
307                         /*
308                          * XXX This is very awkward situation, because
309                          * execution can neither continue (request
310                          * interpreters assume that env is set up), nor repeat
311                          * the loop (as this potentially results in a tight
312                          * loop of -ENOMEM's).
313                          *
314                          * Fortunately, refill only ever does something when
315                          * new modules are loaded, i.e., early during boot up.
316                          */
317                         CERROR("Failure to refill session: %d\n", rc);
318                         continue;
319                 }
320
321                 timeout = ptlrpc_set_next_timeout(pc->pc_set);
322                 lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
323                                   ptlrpc_expired_set, pc->pc_set);
324
325                 lu_context_enter(&env.le_ctx);
326                 l_wait_event(pc->pc_set->set_waitq,
327                              ptlrpcd_check(&env, pc), &lwi);
328                 lu_context_exit(&env.le_ctx);
329
330                 /*
331                  * Abort inflight rpcs for forced stop case.
332                  */
333                 if (cfs_test_bit(LIOD_STOP, &pc->pc_flags)) {
334                         if (cfs_test_bit(LIOD_FORCE, &pc->pc_flags))
335                                 ptlrpc_abort_set(pc->pc_set);
336                         exit++;
337                 }
338
339                 /*
340                  * Let's make one more loop to make sure that ptlrpcd_check()
341                  * copied all raced new rpcs into the set so we can kill them.
342                  */
343         } while (exit < 2);
344
345         /*
346          * Wait for inflight requests to drain.
347          */
348         if (!cfs_list_empty(&pc->pc_set->set_requests))
349                 ptlrpc_set_wait(pc->pc_set);
350         lu_context_fini(&env.le_ctx);
351         cfs_complete(&pc->pc_finishing);
352
353         cfs_clear_bit(LIOD_START, &pc->pc_flags);
354         cfs_clear_bit(LIOD_STOP, &pc->pc_flags);
355         cfs_clear_bit(LIOD_FORCE, &pc->pc_flags);
356         return 0;
357 }
358
359 #else /* !__KERNEL__ */
360
361 /**
362  * In liblustre we do not have separate threads, so this function
363  * is called from time to time all across common code to see
364  * if something needs to be processed on ptlrpcd set.
365  */
366 int ptlrpcd_check_async_rpcs(void *arg)
367 {
368         struct ptlrpcd_ctl *pc = arg;
369         int                 rc = 0;
370
371         /*
372          * Single threaded!!
373          */
374         pc->pc_recurred++;
375
376         if (pc->pc_recurred == 1) {
377                 rc = lu_env_refill(&pc->pc_env);
378                 if (rc == 0) {
379                         lu_context_enter(&pc->pc_env.le_ctx);
380                         rc = ptlrpcd_check(&pc->pc_env, pc);
381                         lu_context_exit(&pc->pc_env.le_ctx);
382                         if (!rc)
383                                 ptlrpc_expired_set(pc->pc_set);
384                         /*
385                          * XXX: send replay requests.
386                          */
387                         if (cfs_test_bit(LIOD_RECOVERY, &pc->pc_flags))
388                                 rc = ptlrpcd_check(&pc->pc_env, pc);
389                 }
390         }
391
392         pc->pc_recurred--;
393         return rc;
394 }
395
396 int ptlrpcd_idle(void *arg)
397 {
398         struct ptlrpcd_ctl *pc = arg;
399
400         return (cfs_list_empty(&pc->pc_set->set_new_requests) &&
401                 cfs_atomic_read(&pc->pc_set->set_remaining) == 0);
402 }
403
404 #endif
405
406 int ptlrpcd_start(const char *name, struct ptlrpcd_ctl *pc)
407 {
408         int rc;
409         ENTRY;
410
411         /*
412          * Do not allow start second thread for one pc.
413          */
414         if (cfs_test_and_set_bit(LIOD_START, &pc->pc_flags)) {
415                 CERROR("Starting second thread (%s) for same pc %p\n",
416                        name, pc);
417                 RETURN(-EALREADY);
418         }
419
420         cfs_init_completion(&pc->pc_starting);
421         cfs_init_completion(&pc->pc_finishing);
422         cfs_spin_lock_init(&pc->pc_lock);
423         strncpy(pc->pc_name, name, sizeof(pc->pc_name) - 1);
424         pc->pc_set = ptlrpc_prep_set();
425         if (pc->pc_set == NULL)
426                 GOTO(out, rc = -ENOMEM);
427         /*
428          * So far only "client" ptlrpcd uses an environment. In the future,
429          * ptlrpcd thread (or a thread-set) has to be given an argument,
430          * describing its "scope".
431          */
432         rc = lu_context_init(&pc->pc_env.le_ctx, LCT_CL_THREAD|LCT_REMEMBER);
433         if (rc != 0) {
434                 ptlrpc_set_destroy(pc->pc_set);
435                 GOTO(out, rc);
436         }
437
438 #ifdef __KERNEL__
439         rc = cfs_create_thread(ptlrpcd, pc, 0);
440         if (rc < 0)  {
441                 lu_context_fini(&pc->pc_env.le_ctx);
442                 ptlrpc_set_destroy(pc->pc_set);
443                 GOTO(out, rc);
444         }
445         rc = 0;
446         cfs_wait_for_completion(&pc->pc_starting);
447 #else
448         pc->pc_wait_callback =
449                 liblustre_register_wait_callback("ptlrpcd_check_async_rpcs",
450                                                  &ptlrpcd_check_async_rpcs, pc);
451         pc->pc_idle_callback =
452                 liblustre_register_idle_callback("ptlrpcd_check_idle_rpcs",
453                                                  &ptlrpcd_idle, pc);
454 #endif
455 out:
456         if (rc)
457                 cfs_clear_bit(LIOD_START, &pc->pc_flags);
458         RETURN(rc);
459 }
460
461 void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force)
462 {
463         if (!cfs_test_bit(LIOD_START, &pc->pc_flags)) {
464                 CERROR("Thread for pc %p was not started\n", pc);
465                 return;
466         }
467
468         cfs_set_bit(LIOD_STOP, &pc->pc_flags);
469         if (force)
470                 cfs_set_bit(LIOD_FORCE, &pc->pc_flags);
471         cfs_waitq_signal(&pc->pc_set->set_waitq);
472 #ifdef __KERNEL__
473         cfs_wait_for_completion(&pc->pc_finishing);
474 #else
475         liblustre_deregister_wait_callback(pc->pc_wait_callback);
476         liblustre_deregister_idle_callback(pc->pc_idle_callback);
477 #endif
478         lu_context_fini(&pc->pc_env.le_ctx);
479         ptlrpc_set_destroy(pc->pc_set);
480 }
481
482 void ptlrpcd_fini(void)
483 {
484         int i;
485         int j;
486
487         ENTRY;
488
489         for (i = 0; i < PSCOPE_NR; ++i) {
490                 for (j = 0; j < PT_NR; ++j) {
491                         struct ptlrpcd_ctl *pc;
492
493                         pc = &ptlrpcd_scopes[i].pscope_thread[j].pt_ctl;
494
495                         if (cfs_test_bit(LIOD_START, &pc->pc_flags))
496                                 ptlrpcd_stop(pc, 0);
497                 }
498         }
499         EXIT;
500 }
501
502 int ptlrpcd_addref(void)
503 {
504         int rc = 0;
505         int i;
506         int j;
507         ENTRY;
508
509         cfs_mutex_down(&ptlrpcd_sem);
510         if (++ptlrpcd_users == 1) {
511                 for (i = 0; rc == 0 && i < PSCOPE_NR; ++i) {
512                         for (j = 0; rc == 0 && j < PT_NR; ++j) {
513                                 struct ptlrpcd_thread *pt;
514                                 struct ptlrpcd_ctl    *pc;
515
516                                 pt = &ptlrpcd_scopes[i].pscope_thread[j];
517                                 pc = &pt->pt_ctl;
518                                 if (j == PT_RECOVERY)
519                                         cfs_set_bit(LIOD_RECOVERY, &pc->pc_flags);
520                                 rc = ptlrpcd_start(pt->pt_name, pc);
521                         }
522                 }
523                 if (rc != 0) {
524                         --ptlrpcd_users;
525                         ptlrpcd_fini();
526                 }
527         }
528         cfs_mutex_up(&ptlrpcd_sem);
529         RETURN(rc);
530 }
531
532 void ptlrpcd_decref(void)
533 {
534         cfs_mutex_down(&ptlrpcd_sem);
535         if (--ptlrpcd_users == 0)
536                 ptlrpcd_fini();
537         cfs_mutex_up(&ptlrpcd_sem);
538 }
539 /** @} ptlrpcd */