Whamcloud - gitweb
* Added lonal (loopback NAL)
[fs/lustre-release.git] / lustre / ptlrpc / pinger.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Portal-RPC reconnection and replay operations, for use in recovery.
5  *
6  *  Copyright (c) 2003 Cluster File Systems, Inc.
7  *   Authors: Phil Schwan <phil@clusterfs.com>
8  *            Mike Shaver <shaver@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef __KERNEL__
27 #include <liblustre.h>
28 #else
29 #include <linux/version.h>
30 #include <asm/semaphore.h>
31 #define DEBUG_SUBSYSTEM S_RPC
32 #endif
33
34 #include <linux/obd_support.h>
35 #include <linux/obd_class.h>
36 #include "ptlrpc_internal.h"
37
38 static DECLARE_MUTEX(pinger_sem);
39 static struct list_head pinger_imports = LIST_HEAD_INIT(pinger_imports);
40
41 int ptlrpc_ping(struct obd_import *imp) 
42 {
43         struct ptlrpc_request *req;
44         int rc = 0;
45         ENTRY;
46
47         req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL,
48                               NULL);
49         if (req) {
50                 DEBUG_REQ(D_HA, req, "pinging %s->%s",
51                           imp->imp_obd->obd_uuid.uuid,
52                           imp->imp_target_uuid.uuid);
53                 req->rq_no_resend = req->rq_no_delay = 1;
54                 req->rq_replen = lustre_msg_size(0, 
55                                                  NULL);
56                 ptlrpcd_add_req(req);
57         } else {
58                 CERROR("OOM trying to ping %s->%s\n",
59                           imp->imp_obd->obd_uuid.uuid,
60                           imp->imp_target_uuid.uuid);
61                 rc = -ENOMEM;
62         }
63
64         RETURN(rc);
65 }
66
67 #ifdef __KERNEL__
68 static int ptlrpc_pinger_main(void *arg)
69 {
70         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
71         struct ptlrpc_thread *thread = data->thread;
72         unsigned long flags;
73         ENTRY;
74
75         lock_kernel();
76         ptlrpc_daemonize();
77
78         SIGNAL_MASK_LOCK(current, flags);
79         sigfillset(&current->blocked);
80         RECALC_SIGPENDING;
81         SIGNAL_MASK_UNLOCK(current, flags);
82
83         LASSERTF(strlen(data->name) < sizeof(current->comm),
84                  "name %d > len %d\n",
85                  (int)strlen(data->name), (int)sizeof(current->comm));
86         THREAD_NAME(current->comm, sizeof(current->comm) - 1, "%s", data->name);
87         unlock_kernel();
88
89         /* Record that the thread is running */
90         thread->t_flags = SVC_RUNNING;
91         wake_up(&thread->t_ctl_waitq);
92
93         /* And now, loop forever, pinging as needed. */
94         while (1) {
95                 unsigned long this_ping = jiffies;
96                 long time_to_next_ping;
97                 struct l_wait_info lwi = LWI_TIMEOUT(obd_timeout * HZ,
98                                                      NULL, NULL);
99                 struct list_head *iter;
100
101                 down(&pinger_sem);
102                 list_for_each(iter, &pinger_imports) {
103                         struct obd_import *imp =
104                                 list_entry(iter, struct obd_import,
105                                            imp_pinger_chain);
106                         int force, level;
107                         unsigned long flags;
108
109
110                         spin_lock_irqsave(&imp->imp_lock, flags);
111                         level = imp->imp_state;
112                         force = imp->imp_force_verify;
113                         if (force)
114                                 imp->imp_force_verify = 0;
115                         spin_unlock_irqrestore(&imp->imp_lock, flags);
116
117                         if (imp->imp_next_ping <= this_ping || force) {
118                                 if (level == LUSTRE_IMP_DISCON && 
119                                     !imp->imp_deactive) {
120                                         /* wait at least a timeout before 
121                                            trying recovery again. */
122                                         imp->imp_next_ping = jiffies + 
123                                                 (obd_timeout * HZ);
124                                         ptlrpc_initiate_recovery(imp);
125                                 } 
126                                 else if (level != LUSTRE_IMP_FULL ||
127                                          imp->imp_obd->obd_no_recov) {
128                                         CDEBUG(D_HA, 
129                                                "not pinging %s (in recovery "
130                                                " or recovery disabled: %s)\n",
131                                                imp->imp_target_uuid.uuid,
132                                                ptlrpc_import_state_name(level));
133                                 } 
134                                 else if (imp->imp_pingable || force) {
135                                         ptlrpc_ping(imp);
136                                 }
137
138                         } else {
139                                 if (imp->imp_pingable)
140                                         CDEBUG(D_HA, "don't need to ping %s "
141                                                "(%lu > %lu)\n", 
142                                                imp->imp_target_uuid.uuid,
143                                                imp->imp_next_ping, this_ping);
144                         }
145                 }
146                 up(&pinger_sem);
147
148                 /* Wait until the next ping time, or until we're stopped. */
149                 time_to_next_ping = this_ping + (obd_timeout * HZ) - jiffies;
150                 CDEBUG(D_HA, "next ping in %lu (%lu)\n", time_to_next_ping,
151                        this_ping + (obd_timeout * HZ));
152                 if (time_to_next_ping > 0) {
153                         lwi = LWI_TIMEOUT(time_to_next_ping, NULL, NULL);
154                         l_wait_event(thread->t_ctl_waitq,
155                                      thread->t_flags & (SVC_STOPPING|SVC_EVENT),
156                                      &lwi);
157                         if (thread->t_flags & SVC_STOPPING) {
158                                 thread->t_flags &= ~SVC_STOPPING;
159                                 EXIT;
160                                 break;
161                         } else if (thread->t_flags & SVC_EVENT) {
162                                 /* woken after adding import to reset timer */
163                                 thread->t_flags &= ~SVC_EVENT;
164                         }
165                 }
166         }
167
168         thread->t_flags = SVC_STOPPED;
169         wake_up(&thread->t_ctl_waitq);
170
171         CDEBUG(D_NET, "pinger thread exiting, process %d\n", current->pid);
172         return 0;
173 }
174
175 static struct ptlrpc_thread *pinger_thread = NULL;
176
177 int ptlrpc_start_pinger(void)
178 {
179         struct l_wait_info lwi = { 0 };
180         struct ptlrpc_svc_data d;
181         int rc;
182 #ifndef ENABLE_PINGER
183         return 0;
184 #endif
185         ENTRY;
186
187         if (pinger_thread != NULL)
188                 RETURN(-EALREADY);
189
190         OBD_ALLOC(pinger_thread, sizeof(*pinger_thread));
191         if (pinger_thread == NULL)
192                 RETURN(-ENOMEM);
193         init_waitqueue_head(&pinger_thread->t_ctl_waitq);
194
195         d.name = "ll_ping";
196         d.thread = pinger_thread;
197
198         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
199          * just drop the VM and FILES in ptlrpc_daemonize() right away. */
200         rc = kernel_thread(ptlrpc_pinger_main, &d, CLONE_VM | CLONE_FILES);
201         if (rc < 0) {
202                 CERROR("cannot start thread: %d\n", rc);
203                 OBD_FREE(pinger_thread, sizeof(*pinger_thread));
204                 RETURN(rc);
205         }
206         l_wait_event(pinger_thread->t_ctl_waitq,
207                      pinger_thread->t_flags & SVC_RUNNING, &lwi);
208
209         RETURN(rc);
210 }
211
212 int ptlrpc_stop_pinger(void)
213 {
214         struct l_wait_info lwi = { 0 };
215         int rc = 0;
216 #ifndef ENABLE_PINGER
217         return 0;
218 #endif
219         ENTRY;
220
221         if (pinger_thread == NULL)
222                 RETURN(-EALREADY);
223         down(&pinger_sem);
224         pinger_thread->t_flags = SVC_STOPPING;
225         wake_up(&pinger_thread->t_ctl_waitq);
226         up(&pinger_sem);
227
228         l_wait_event(pinger_thread->t_ctl_waitq,
229                      (pinger_thread->t_flags & SVC_STOPPED), &lwi);
230
231         OBD_FREE(pinger_thread, sizeof(*pinger_thread));
232         pinger_thread = NULL;
233         RETURN(rc);
234 }
235
236 void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
237 {
238         down(&pinger_sem);
239         imp->imp_next_ping = jiffies + (obd_timeout * HZ);
240         up(&pinger_sem);
241 }
242
243 int ptlrpc_pinger_add_import(struct obd_import *imp)
244 {
245         ENTRY;
246         if (!list_empty(&imp->imp_pinger_chain))
247                 RETURN(-EALREADY);
248
249         down(&pinger_sem);
250         CDEBUG(D_HA, "adding pingable import %s->%s\n",
251                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
252         imp->imp_next_ping = jiffies + (obd_timeout * HZ);
253         /* XXX sort, blah blah */
254         list_add_tail(&imp->imp_pinger_chain, &pinger_imports);
255         class_import_get(imp);
256
257         ptlrpc_pinger_wake_up();
258         up(&pinger_sem);
259
260         RETURN(0);
261 }
262
263 int ptlrpc_pinger_del_import(struct obd_import *imp)
264 {
265         ENTRY;
266         if (list_empty(&imp->imp_pinger_chain))
267                 RETURN(-ENOENT);
268
269         down(&pinger_sem);
270         list_del_init(&imp->imp_pinger_chain);
271         CDEBUG(D_HA, "removing pingable import %s->%s\n",
272                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
273         class_import_put(imp);
274         up(&pinger_sem);
275         RETURN(0);
276 }
277
278 void ptlrpc_pinger_wake_up()
279 {
280 #ifdef ENABLE_PINGER
281         pinger_thread->t_flags |= SVC_EVENT;
282         wake_up(&pinger_thread->t_ctl_waitq);
283 #endif
284 }
285
286 #else /* !__KERNEL__ */
287
288 /* XXX
289  * the current implementation of pinger in liblustre is not optimized
290  */
291
292 static struct pinger_data {
293         int             pd_recursion;
294         unsigned long   pd_this_ping;
295         unsigned long   pd_next_ping;
296         struct ptlrpc_request_set *pd_set;
297 } pinger_args;
298
299 static int pinger_check_rpcs(void *arg)
300 {
301         unsigned long curtime = time(NULL);
302         struct ptlrpc_request *req;
303         struct ptlrpc_request_set *set;
304         struct list_head *iter;
305         struct pinger_data *pd = &pinger_args;
306         int rc;
307
308         /* prevent recursion */
309         if (pd->pd_recursion++) {
310                 CDEBUG(D_HA, "pinger: recursion! quit\n");
311                 LASSERT(pd->pd_set);
312                 pd->pd_recursion--;
313                 return 0;
314         }
315
316         /* have we reached ping point? */
317         if (!pd->pd_set && pd->pd_next_ping > curtime) {
318                 pd->pd_recursion--;
319                 return 0;
320         }
321
322         /* if we have rpc_set already, continue processing it */
323         if (pd->pd_set) {
324                 LASSERT(pd->pd_this_ping);
325                 set = pd->pd_set;
326                 goto do_check_set;
327         }
328
329         pd->pd_this_ping = curtime;
330         pd->pd_set = ptlrpc_prep_set();
331         set = pd->pd_set;
332
333         /* add rpcs into set */
334         down(&pinger_sem);
335         list_for_each(iter, &pinger_imports) {
336                 struct obd_import *imp =
337                         list_entry(iter, struct obd_import,
338                                    imp_pinger_chain);
339                 int generation, level;
340                 unsigned long flags;
341
342                 if (imp->imp_next_ping <= pd->pd_this_ping) {
343                         /* Add a ping. */
344                         spin_lock_irqsave(&imp->imp_lock, flags);
345                         generation = imp->imp_generation;
346                         level = imp->imp_state;
347                         spin_unlock_irqrestore(&imp->imp_lock, flags);
348
349                         if (level != LUSTRE_IMP_FULL) {
350                                 CDEBUG(D_HA,
351                                        "not pinging %s (in recovery)\n",
352                                        imp->imp_target_uuid.uuid);
353                                 continue;
354                         }
355
356                         req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL,
357                                               NULL);
358                         if (!req) {
359                                 CERROR("out of memory\n");
360                                 break;
361                         }
362                         req->rq_no_resend = 1;
363                         req->rq_replen = lustre_msg_size(0, NULL);
364                         req->rq_send_state = LUSTRE_IMP_FULL;
365                         req->rq_phase = RQ_PHASE_RPC;
366                         req->rq_import_generation = generation;
367                         ptlrpc_set_add_req(set, req);
368                 } else {
369                         CDEBUG(D_HA, "don't need to ping %s (%lu > "
370                                "%lu)\n", imp->imp_target_uuid.uuid,
371                                imp->imp_next_ping, pd->pd_this_ping);
372                 }
373         }
374         pd->pd_this_ping = curtime;
375         up(&pinger_sem);
376
377         /* Might be empty, that's OK. */
378         if (set->set_remaining == 0)
379                 CDEBUG(D_HA, "nothing to ping\n");
380
381         list_for_each(iter, &set->set_requests) {
382                 struct ptlrpc_request *req =
383                         list_entry(iter, struct ptlrpc_request,
384                                    rq_set_chain);
385                 DEBUG_REQ(D_HA, req, "pinging %s->%s",
386                           req->rq_import->imp_obd->obd_uuid.uuid,
387                           req->rq_import->imp_target_uuid.uuid);
388                 (void)ptl_send_rpc(req);
389         }
390
391 do_check_set:
392         rc = ptlrpc_check_set(set);
393
394         /* not finished, and we are not expired, simply return */
395         if (!rc && curtime < pd->pd_this_ping + obd_timeout) {
396                 CDEBUG(D_HA, "not finished, but also not expired\n");
397                 pd->pd_recursion--;
398                 return 0;
399         }
400
401         /* Expire all the requests that didn't come back. */
402         down(&pinger_sem);
403         list_for_each(iter, &set->set_requests) {
404                 req = list_entry(iter, struct ptlrpc_request,
405                                  rq_set_chain);
406
407                 if (req->rq_replied)
408                         continue;
409
410                 req->rq_phase = RQ_PHASE_COMPLETE;
411                 set->set_remaining--;
412                 /* If it was disconnected, don't sweat it. */
413                 if (list_empty(&req->rq_import->imp_pinger_chain)) {
414                         ptlrpc_unregister_reply(req);
415                         continue;
416                 }
417
418                 CDEBUG(D_HA, "pinger initiate expire_one_request\n");
419                 ptlrpc_expire_one_request(req);
420         }
421         up(&pinger_sem);
422
423         ptlrpc_set_destroy(set);
424         pd->pd_set = NULL;
425
426         pd->pd_next_ping = pd->pd_this_ping + obd_timeout;
427         pd->pd_this_ping = 0; /* XXX for debug */
428
429         CDEBUG(D_HA, "finished a round ping\n");
430         pd->pd_recursion--;
431         return 0;
432 }
433
434 static void *pinger_callback = NULL;
435
436 int ptlrpc_start_pinger(void)
437 {
438         memset(&pinger_args, 0, sizeof(pinger_args));
439 #ifdef ENABLE_PINGER
440         pinger_callback =
441                 liblustre_register_wait_callback(&pinger_check_rpcs, &pinger_args);
442 #endif
443         return 0;
444 }
445
446 int ptlrpc_stop_pinger(void)
447 {
448 #ifdef ENABLE_PINGER
449         if (pinger_callback)
450                 liblustre_deregister_wait_callback(pinger_callback);
451 #endif
452         return 0;
453 }
454
455 void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
456 {
457         down(&pinger_sem);
458         imp->imp_next_ping = time(NULL) + obd_timeout;
459         if (pinger_args.pd_set == NULL &&
460             pinger_args.pd_next_ping > imp->imp_next_ping) {
461                 CDEBUG(D_HA, "set next ping to %ld(cur %ld)\n",
462                         imp->imp_next_ping, time(NULL));
463                 pinger_args.pd_next_ping = imp->imp_next_ping;
464         }
465         up(&pinger_sem);
466 }
467
468 int ptlrpc_pinger_add_import(struct obd_import *imp)
469 {
470         ENTRY;
471         if (!list_empty(&imp->imp_pinger_chain))
472                 RETURN(-EALREADY);
473
474         CDEBUG(D_HA, "adding pingable import %s->%s\n",
475                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
476         ptlrpc_pinger_sending_on_import(imp);
477
478         down(&pinger_sem);
479         list_add_tail(&imp->imp_pinger_chain, &pinger_imports);
480         class_import_get(imp);
481         up(&pinger_sem);
482
483         RETURN(0);
484 }
485
486 int ptlrpc_pinger_del_import(struct obd_import *imp)
487 {
488         ENTRY;
489         if (list_empty(&imp->imp_pinger_chain))
490                 RETURN(-ENOENT);
491
492         down(&pinger_sem);
493         list_del_init(&imp->imp_pinger_chain);
494         CDEBUG(D_HA, "removing pingable import %s->%s\n",
495                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
496         class_import_put(imp);
497         up(&pinger_sem);
498         RETURN(0);
499 }
500
501 void ptlrpc_pinger_wake_up()
502 {
503 #ifdef ENABLE_PINGER
504         /* XXX force pinger to run, if needed */
505 #endif
506 }
507 #endif /* !__KERNEL__ */