Whamcloud - gitweb
Branch b1_4_mountconf
[fs/lustre-release.git] / lustre / ptlrpc / pinger.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Portal-RPC reconnection and replay operations, for use in recovery.
5  *
6  *  Copyright (c) 2003 Cluster File Systems, Inc.
7  *   Authors: Phil Schwan <phil@clusterfs.com>
8  *            Mike Shaver <shaver@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef __KERNEL__
30 #include <liblustre.h>
31 #else
32 #include <linux/version.h>
33 #include <asm/semaphore.h>
34 #define DEBUG_SUBSYSTEM S_RPC
35 #endif
36
37 #include <linux/obd_support.h>
38 #include <linux/obd_class.h>
39 #include "ptlrpc_internal.h"
40
41 static DECLARE_MUTEX(pinger_sem);
42 static struct list_head pinger_imports = LIST_HEAD_INIT(pinger_imports);
43
44 int ptlrpc_ping(struct obd_import *imp)
45 {
46         struct ptlrpc_request *req;
47         int rc = 0;
48         ENTRY;
49
50         req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL,
51                               NULL);
52         if (req) {
53                 DEBUG_REQ(D_INFO, req, "pinging %s->%s",
54                           imp->imp_obd->obd_uuid.uuid,
55                           imp->imp_target_uuid.uuid);
56                 req->rq_no_resend = req->rq_no_delay = 1;
57                 req->rq_replen = lustre_msg_size(0, NULL);
58                 ptlrpcd_add_req(req);
59         } else {
60                 CERROR("OOM trying to ping %s->%s\n",
61                        imp->imp_obd->obd_uuid.uuid,
62                        imp->imp_target_uuid.uuid);
63                 rc = -ENOMEM;
64         }
65
66         RETURN(rc);
67 }
68
69 static inline void ptlrpc_update_next_ping(struct obd_import *imp)
70 {
71         imp->imp_next_ping = jiffies + HZ *
72                 (imp->imp_state == LUSTRE_IMP_DISCON ? 10 : PING_INTERVAL);
73 }
74
75 void ptlrpc_ping_import_soon(struct obd_import *imp)
76 {
77         imp->imp_next_ping = jiffies;
78 }
79
80 #ifdef __KERNEL__
81 static int ptlrpc_pinger_main(void *arg)
82 {
83         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
84         struct ptlrpc_thread *thread = data->thread;
85         unsigned long flags;
86         ENTRY;
87
88         lock_kernel();
89         ptlrpc_daemonize();
90
91         SIGNAL_MASK_LOCK(current, flags);
92         sigfillset(&current->blocked);
93         RECALC_SIGPENDING;
94         SIGNAL_MASK_UNLOCK(current, flags);
95
96         LASSERTF(strlen(data->name) < sizeof(current->comm),
97                  "name %d > len %d\n",
98                  (int)strlen(data->name), (int)sizeof(current->comm));
99         THREAD_NAME(current->comm, sizeof(current->comm) - 1, "%s", data->name);
100         unlock_kernel();
101
102         /* Record that the thread is running */
103         thread->t_flags = SVC_RUNNING;
104         wake_up(&thread->t_ctl_waitq);
105
106         /* And now, loop forever, pinging as needed. */
107         while (1) {
108                 unsigned long this_ping = jiffies;
109                 long time_to_next_ping;
110                 struct l_wait_info lwi = LWI_TIMEOUT(PING_INTERVAL * HZ,
111                                                      NULL, NULL);
112                 struct list_head *iter;
113
114                 down(&pinger_sem);
115                 list_for_each(iter, &pinger_imports) {
116                         struct obd_import *imp =
117                                 list_entry(iter, struct obd_import,
118                                            imp_pinger_chain);
119                         int force, level;
120                         unsigned long flags;
121
122
123                         spin_lock_irqsave(&imp->imp_lock, flags);
124                         level = imp->imp_state;
125                         force = imp->imp_force_verify;
126                         imp->imp_force_verify = 0;
127                         spin_unlock_irqrestore(&imp->imp_lock, flags);
128                         CDEBUG(level == LUSTRE_IMP_FULL ? D_INFO : D_HA,
129                                "level %s/%u force %u deactive %u pingable %u\n",
130                                ptlrpc_import_state_name(level), level,
131                                force, imp->imp_deactive, imp->imp_pingable);
132
133                         if (force ||
134                             /* if the next ping is within, say, 5 jiffies from
135                                now, go ahead and ping. See note below. */
136                             time_after_eq(this_ping, imp->imp_next_ping - 5)) {
137                                 if (level == LUSTRE_IMP_DISCON &&
138                                     !imp->imp_deactive) {
139                                         /* wait at least a timeout before
140                                            trying recovery again. */
141                                         imp->imp_next_ping = jiffies +
142                                                 obd_timeout * HZ;
143                                         ptlrpc_initiate_recovery(imp);
144                                 } else if (level != LUSTRE_IMP_FULL ||
145                                          imp->imp_obd->obd_no_recov ||
146                                          imp->imp_deactive) {
147                                         CDEBUG(D_HA, "not pinging %s "
148                                                "(in recovery: %s or recovery "
149                                                "disabled: %u/%u)\n",
150                                                imp->imp_target_uuid.uuid,
151                                                ptlrpc_import_state_name(level),
152                                                imp->imp_deactive,
153                                                imp->imp_obd->obd_no_recov);
154                                 } else if (imp->imp_pingable || force) {
155                                         ptlrpc_ping(imp);
156                                 }
157                         } else {
158                                 if (!imp->imp_pingable)
159                                         continue;
160                                 CDEBUG(D_INFO,
161                                        "don't need to ping %s (%lu > %lu)\n",
162                                        imp->imp_target_uuid.uuid,
163                                        imp->imp_next_ping, this_ping);
164                         }
165
166                         /* obd_timeout might have changed */
167                         if (time_after(imp->imp_next_ping,
168                                        this_ping + PING_INTERVAL * HZ))
169                                 ptlrpc_update_next_ping(imp);
170                 }
171                 up(&pinger_sem);
172
173                 /* Wait until the next ping time, or until we're stopped. */
174                 time_to_next_ping = this_ping + (PING_INTERVAL * HZ) - jiffies;
175                 /* The ping sent by ptlrpc_send_rpc may get sent out
176                    say .01 second after this.
177                    ptlrpc_pinger_sending_on_import will then set the
178                    next ping time to next_ping + .01 sec, which means
179                    we will SKIP the next ping at next_ping, and the
180                    ping will get sent 2 timeouts from now!  Beware. */
181                 CDEBUG(D_INFO, "next ping in %lu (%lu)\n", time_to_next_ping,
182                        this_ping + PING_INTERVAL * HZ);
183                 if (time_to_next_ping > 0) {
184                         lwi = LWI_TIMEOUT(max_t(long, time_to_next_ping, HZ),
185                                           NULL, NULL);
186                         l_wait_event(thread->t_ctl_waitq,
187                                      thread->t_flags & (SVC_STOPPING|SVC_EVENT),
188                                      &lwi);
189                         if (thread->t_flags & SVC_STOPPING) {
190                                 thread->t_flags &= ~SVC_STOPPING;
191                                 EXIT;
192                                 break;
193                         } else if (thread->t_flags & SVC_EVENT) {
194                                 /* woken after adding import to reset timer */
195                                 thread->t_flags &= ~SVC_EVENT;
196                         }
197                 }
198         }
199
200         thread->t_flags = SVC_STOPPED;
201         wake_up(&thread->t_ctl_waitq);
202
203         CDEBUG(D_NET, "pinger thread exiting, process %d\n", current->pid);
204         return 0;
205 }
206
207 static struct ptlrpc_thread *pinger_thread = NULL;
208
209 int ptlrpc_start_pinger(void)
210 {
211         struct l_wait_info lwi = { 0 };
212         struct ptlrpc_svc_data d;
213         int rc;
214 #ifndef ENABLE_PINGER
215         return 0;
216 #endif
217         ENTRY;
218
219         if (pinger_thread != NULL)
220                 RETURN(-EALREADY);
221
222         OBD_ALLOC(pinger_thread, sizeof(*pinger_thread));
223         if (pinger_thread == NULL)
224                 RETURN(-ENOMEM);
225         init_waitqueue_head(&pinger_thread->t_ctl_waitq);
226
227         d.name = "ll_ping";
228         d.thread = pinger_thread;
229
230         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
231          * just drop the VM and FILES in ptlrpc_daemonize() right away. */
232         rc = kernel_thread(ptlrpc_pinger_main, &d, CLONE_VM | CLONE_FILES);
233         if (rc < 0) {
234                 CERROR("cannot start thread: %d\n", rc);
235                 OBD_FREE(pinger_thread, sizeof(*pinger_thread));
236                 RETURN(rc);
237         }
238         l_wait_event(pinger_thread->t_ctl_waitq,
239                      pinger_thread->t_flags & SVC_RUNNING, &lwi);
240
241         RETURN(rc);
242 }
243
244 int ptlrpc_stop_pinger(void)
245 {
246         struct l_wait_info lwi = { 0 };
247         int rc = 0;
248 #ifndef ENABLE_PINGER
249         return 0;
250 #endif
251         ENTRY;
252
253         if (pinger_thread == NULL)
254                 RETURN(-EALREADY);
255         down(&pinger_sem);
256         pinger_thread->t_flags = SVC_STOPPING;
257         wake_up(&pinger_thread->t_ctl_waitq);
258         up(&pinger_sem);
259
260         l_wait_event(pinger_thread->t_ctl_waitq,
261                      (pinger_thread->t_flags & SVC_STOPPED), &lwi);
262
263         OBD_FREE(pinger_thread, sizeof(*pinger_thread));
264         pinger_thread = NULL;
265         RETURN(rc);
266 }
267
268 void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
269 {
270         ptlrpc_update_next_ping(imp);
271 }
272
273 int ptlrpc_pinger_add_import(struct obd_import *imp)
274 {
275         ENTRY;
276         if (!list_empty(&imp->imp_pinger_chain))
277                 RETURN(-EALREADY);
278
279         down(&pinger_sem);
280         CDEBUG(D_HA, "adding pingable import %s->%s\n",
281                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
282         ptlrpc_update_next_ping(imp);
283         /* XXX sort, blah blah */
284         list_add_tail(&imp->imp_pinger_chain, &pinger_imports);
285         class_import_get(imp);
286
287         ptlrpc_pinger_wake_up();
288         up(&pinger_sem);
289
290         RETURN(0);
291 }
292
293 int ptlrpc_pinger_del_import(struct obd_import *imp)
294 {
295         ENTRY;
296         if (list_empty(&imp->imp_pinger_chain))
297                 RETURN(-ENOENT);
298
299         down(&pinger_sem);
300         list_del_init(&imp->imp_pinger_chain);
301         CDEBUG(D_HA, "removing pingable import %s->%s\n",
302                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
303         class_import_put(imp);
304         up(&pinger_sem);
305         RETURN(0);
306 }
307
308 void ptlrpc_pinger_wake_up()
309 {
310 #ifdef ENABLE_PINGER
311         pinger_thread->t_flags |= SVC_EVENT;
312         wake_up(&pinger_thread->t_ctl_waitq);
313 #endif
314 }
315
316 #else /* !__KERNEL__ */
317
318 /* XXX
319  * the current implementation of pinger in liblustre is not optimized
320  */
321
322 static struct pinger_data {
323         int             pd_recursion;
324         unsigned long   pd_this_ping;   /* jiffies */
325         unsigned long   pd_next_ping;   /* jiffies */
326         struct ptlrpc_request_set *pd_set;
327 } pinger_args;
328
329 static int pinger_check_rpcs(void *arg)
330 {
331         unsigned long curtime = jiffies;
332         struct ptlrpc_request *req;
333         struct ptlrpc_request_set *set;
334         struct list_head *iter;
335         struct pinger_data *pd = &pinger_args;
336         int rc;
337
338         /* prevent recursion */
339         if (pd->pd_recursion++) {
340                 CDEBUG(D_HA, "pinger: recursion! quit\n");
341                 LASSERT(pd->pd_set);
342                 pd->pd_recursion--;
343                 return 0;
344         }
345
346         /* have we reached ping point? */
347         if (!pd->pd_set && time_before(curtime, pd->pd_next_ping)) {
348                 pd->pd_recursion--;
349                 return 0;
350         }
351
352         /* if we have rpc_set already, continue processing it */
353         if (pd->pd_set) {
354                 LASSERT(pd->pd_this_ping);
355                 set = pd->pd_set;
356                 goto do_check_set;
357         }
358
359         pd->pd_this_ping = curtime;
360         pd->pd_set = ptlrpc_prep_set();
361         if (pd->pd_set == NULL)
362                 goto out;
363         set = pd->pd_set;
364
365         /* add rpcs into set */
366         down(&pinger_sem);
367         list_for_each(iter, &pinger_imports) {
368                 struct obd_import *imp =
369                         list_entry(iter, struct obd_import, imp_pinger_chain);
370                 int generation, level;
371                 unsigned long flags;
372
373                 if (time_after_eq(pd->pd_this_ping, imp->imp_next_ping - 5)) {
374                         /* Add a ping. */
375                         spin_lock_irqsave(&imp->imp_lock, flags);
376                         generation = imp->imp_generation;
377                         level = imp->imp_state;
378                         spin_unlock_irqrestore(&imp->imp_lock, flags);
379
380                         if (level != LUSTRE_IMP_FULL) {
381                                 CDEBUG(D_HA,
382                                        "not pinging %s (in recovery)\n",
383                                        imp->imp_target_uuid.uuid);
384                                 continue;
385                         }
386
387                         req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL,
388                                               NULL);
389                         if (!req) {
390                                 CERROR("out of memory\n");
391                                 break;
392                         }
393                         req->rq_no_resend = 1;
394                         req->rq_replen = lustre_msg_size(0, NULL);
395                         req->rq_send_state = LUSTRE_IMP_FULL;
396                         req->rq_phase = RQ_PHASE_RPC;
397                         req->rq_import_generation = generation;
398                         ptlrpc_set_add_req(set, req);
399                 } else {
400                         CDEBUG(D_HA, "don't need to ping %s (%lu > "
401                                "%lu)\n", imp->imp_target_uuid.uuid,
402                                imp->imp_next_ping, pd->pd_this_ping);
403                 }
404         }
405         pd->pd_this_ping = curtime;
406         up(&pinger_sem);
407
408         /* Might be empty, that's OK. */
409         if (set->set_remaining == 0)
410                 CDEBUG(D_HA, "nothing to ping\n");
411
412         list_for_each(iter, &set->set_requests) {
413                 struct ptlrpc_request *req =
414                         list_entry(iter, struct ptlrpc_request,
415                                    rq_set_chain);
416                 DEBUG_REQ(D_HA, req, "pinging %s->%s",
417                           req->rq_import->imp_obd->obd_uuid.uuid,
418                           req->rq_import->imp_target_uuid.uuid);
419                 (void)ptl_send_rpc(req);
420         }
421
422 do_check_set:
423         rc = ptlrpc_check_set(set);
424
425         /* not finished, and we are not expired, simply return */
426         if (!rc && time_before(curtime, pd->pd_this_ping + PING_INTERVAL * HZ)) {
427                 CDEBUG(D_HA, "not finished, but also not expired\n");
428                 pd->pd_recursion--;
429                 return 0;
430         }
431
432         /* Expire all the requests that didn't come back. */
433         down(&pinger_sem);
434         list_for_each(iter, &set->set_requests) {
435                 req = list_entry(iter, struct ptlrpc_request,
436                                  rq_set_chain);
437
438                 if (req->rq_replied)
439                         continue;
440
441                 req->rq_phase = RQ_PHASE_COMPLETE;
442                 set->set_remaining--;
443                 /* If it was disconnected, don't sweat it. */
444                 if (list_empty(&req->rq_import->imp_pinger_chain)) {
445                         ptlrpc_unregister_reply(req);
446                         continue;
447                 }
448
449                 CDEBUG(D_HA, "pinger initiate expire_one_request\n");
450                 ptlrpc_expire_one_request(req);
451         }
452         up(&pinger_sem);
453
454         ptlrpc_set_destroy(set);
455         pd->pd_set = NULL;
456
457 out:
458         pd->pd_next_ping = pd->pd_this_ping + PING_INTERVAL * HZ;
459         pd->pd_this_ping = 0; /* XXX for debug */
460
461         CDEBUG(D_HA, "finished a round ping\n");
462         pd->pd_recursion--;
463         return 0;
464 }
465
466 static void *pinger_callback = NULL;
467
468 int ptlrpc_start_pinger(void)
469 {
470         memset(&pinger_args, 0, sizeof(pinger_args));
471 #ifdef ENABLE_PINGER
472         pinger_callback =
473                 liblustre_register_wait_callback(&pinger_check_rpcs, &pinger_args);
474 #endif
475         return 0;
476 }
477
478 int ptlrpc_stop_pinger(void)
479 {
480 #ifdef ENABLE_PINGER
481         if (pinger_callback)
482                 liblustre_deregister_wait_callback(pinger_callback);
483 #endif
484         return 0;
485 }
486
487 void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
488 {
489         down(&pinger_sem);
490         ptlrpc_update_next_ping(imp);
491         if (pinger_args.pd_set == NULL &&
492             time_before(imp->imp_next_ping, pinger_args.pd_next_ping)) {
493                 CDEBUG(D_HA, "set next ping to %ld(cur %ld)\n",
494                         imp->imp_next_ping, jiffies);
495                 pinger_args.pd_next_ping = imp->imp_next_ping;
496         }
497         up(&pinger_sem);
498 }
499
500 int ptlrpc_pinger_add_import(struct obd_import *imp)
501 {
502         ENTRY;
503         if (!list_empty(&imp->imp_pinger_chain))
504                 RETURN(-EALREADY);
505
506         CDEBUG(D_HA, "adding pingable import %s->%s\n",
507                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
508         ptlrpc_pinger_sending_on_import(imp);
509
510         down(&pinger_sem);
511         list_add_tail(&imp->imp_pinger_chain, &pinger_imports);
512         class_import_get(imp);
513         up(&pinger_sem);
514
515         RETURN(0);
516 }
517
518 int ptlrpc_pinger_del_import(struct obd_import *imp)
519 {
520         ENTRY;
521         if (list_empty(&imp->imp_pinger_chain))
522                 RETURN(-ENOENT);
523
524         down(&pinger_sem);
525         list_del_init(&imp->imp_pinger_chain);
526         CDEBUG(D_HA, "removing pingable import %s->%s\n",
527                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
528         class_import_put(imp);
529         up(&pinger_sem);
530         RETURN(0);
531 }
532
533 void ptlrpc_pinger_wake_up()
534 {
535 #ifdef ENABLE_PINGER
536         /* XXX force pinger to run, if needed */
537 #endif
538 }
539 #endif /* !__KERNEL__ */