Whamcloud - gitweb
Quiet print format warning on ia64.
[fs/lustre-release.git] / lustre / ptlrpc / pinger.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Portal-RPC reconnection and replay operations, for use in recovery.
5  *
6  *  Copyright (c) 2003 Cluster File Systems, Inc.
7  *   Authors: Phil Schwan <phil@clusterfs.com>
8  *            Mike Shaver <shaver@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef __KERNEL__
27 #include <liblustre.h>
28 #else
29 #include <linux/version.h>
30 #include <asm/semaphore.h>
31 #define DEBUG_SUBSYSTEM S_RPC
32 #endif
33
34 #include <linux/obd_support.h>
35 #include <linux/obd_class.h>
36 #include "ptlrpc_internal.h"
37
38 static DECLARE_MUTEX(pinger_sem);
39 static struct list_head pinger_imports = LIST_HEAD_INIT(pinger_imports);
40
41 int ptlrpc_ping(struct obd_import *imp) 
42 {
43         struct ptlrpc_request *req;
44         int rc = 0;
45         ENTRY;
46
47         req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL,
48                               NULL);
49         if (req) {
50                 DEBUG_REQ(D_HA, req, "pinging %s->%s",
51                           imp->imp_obd->obd_uuid.uuid,
52                           imp->imp_target_uuid.uuid);
53                 req->rq_no_resend = req->rq_no_delay = 1;
54                 req->rq_replen = lustre_msg_size(0, 
55                                                  NULL);
56                 ptlrpcd_add_req(req);
57         } else {
58                 CERROR("OOM trying to ping %s->%s\n",
59                           imp->imp_obd->obd_uuid.uuid,
60                           imp->imp_target_uuid.uuid);
61                 rc = -ENOMEM;
62         }
63
64         RETURN(rc);
65 }
66
67 #ifdef __KERNEL__
68 static int ptlrpc_pinger_main(void *arg)
69 {
70         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
71         struct ptlrpc_thread *thread = data->thread;
72         unsigned long flags;
73         ENTRY;
74
75         lock_kernel();
76         ptlrpc_daemonize();
77
78         SIGNAL_MASK_LOCK(current, flags);
79         sigfillset(&current->blocked);
80         RECALC_SIGPENDING;
81         SIGNAL_MASK_UNLOCK(current, flags);
82
83         LASSERTF(strlen(data->name) < sizeof(current->comm),
84                  "name %d > len %d\n",
85                  (int)strlen(data->name), (int)sizeof(current->comm));
86         THREAD_NAME(current->comm, sizeof(current->comm) - 1, "%s", data->name);
87         unlock_kernel();
88
89         /* Record that the thread is running */
90         thread->t_flags = SVC_RUNNING;
91         wake_up(&thread->t_ctl_waitq);
92
93         /* And now, loop forever, pinging as needed. */
94         while (1) {
95                 unsigned long this_ping = jiffies;
96                 long time_to_next_ping;
97                 struct l_wait_info lwi = LWI_TIMEOUT(obd_timeout * HZ,
98                                                      NULL, NULL);
99                 struct list_head *iter;
100
101                 down(&pinger_sem);
102                 list_for_each(iter, &pinger_imports) {
103                         struct obd_import *imp =
104                                 list_entry(iter, struct obd_import,
105                                            imp_pinger_chain);
106                         int force, level;
107                         unsigned long flags;
108
109
110                         spin_lock_irqsave(&imp->imp_lock, flags);
111                         level = imp->imp_state;
112                         force = imp->imp_force_verify;
113                         if (force)
114                                 imp->imp_force_verify = 0;
115                         spin_unlock_irqrestore(&imp->imp_lock, flags);
116
117                         if (imp->imp_next_ping <= this_ping || force) {
118                                 if (level == LUSTRE_IMP_DISCON) {
119                                         /* wait at least a timeout before 
120                                            trying recovery again. */
121                                         imp->imp_next_ping = jiffies + 
122                                                 (obd_timeout * HZ);
123                                         ptlrpc_initiate_recovery(imp);
124                                 } 
125                                 else if (level != LUSTRE_IMP_FULL ||
126                                          imp->imp_obd->obd_no_recov) {
127                                         CDEBUG(D_HA, 
128                                                "not pinging %s (in recovery "
129                                                " or recovery disabled: %s)\n",
130                                                imp->imp_target_uuid.uuid,
131                                                ptlrpc_import_state_name(level));
132                                 } 
133                                 else if (imp->imp_pingable || force) {
134                                         ptlrpc_ping(imp);
135                                 }
136
137                         } else {
138                                 if (imp->imp_pingable)
139                                         CDEBUG(D_HA, "don't need to ping %s "
140                                                "(%lu > %lu)\n", 
141                                                imp->imp_target_uuid.uuid,
142                                                imp->imp_next_ping, this_ping);
143                         }
144                 }
145                 up(&pinger_sem);
146
147                 /* Wait until the next ping time, or until we're stopped. */
148                 time_to_next_ping = this_ping + (obd_timeout * HZ) - jiffies;
149                 CDEBUG(D_HA, "next ping in %lu (%lu)\n", time_to_next_ping,
150                        this_ping + (obd_timeout * HZ));
151                 if (time_to_next_ping > 0) {
152                         lwi = LWI_TIMEOUT(time_to_next_ping, NULL, NULL);
153                         l_wait_event(thread->t_ctl_waitq,
154                                      thread->t_flags & (SVC_STOPPING|SVC_EVENT),
155                                      &lwi);
156                         if (thread->t_flags & SVC_STOPPING) {
157                                 thread->t_flags &= ~SVC_STOPPING;
158                                 EXIT;
159                                 break;
160                         } else if (thread->t_flags & SVC_EVENT) {
161                                 /* woken after adding import to reset timer */
162                                 thread->t_flags &= ~SVC_EVENT;
163                         }
164                 }
165         }
166
167         thread->t_flags = SVC_STOPPED;
168         wake_up(&thread->t_ctl_waitq);
169
170         CDEBUG(D_NET, "pinger thread exiting, process %d\n", current->pid);
171         return 0;
172 }
173
174 static struct ptlrpc_thread *pinger_thread = NULL;
175
176 int ptlrpc_start_pinger(void)
177 {
178         struct l_wait_info lwi = { 0 };
179         struct ptlrpc_svc_data d;
180         int rc;
181 #ifndef ENABLE_PINGER
182         return 0;
183 #endif
184         ENTRY;
185
186         if (pinger_thread != NULL)
187                 RETURN(-EALREADY);
188
189         OBD_ALLOC(pinger_thread, sizeof(*pinger_thread));
190         if (pinger_thread == NULL)
191                 RETURN(-ENOMEM);
192         init_waitqueue_head(&pinger_thread->t_ctl_waitq);
193
194         d.name = "ll_ping";
195         d.thread = pinger_thread;
196
197         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
198          * just drop the VM and FILES in ptlrpc_daemonize() right away. */
199         rc = kernel_thread(ptlrpc_pinger_main, &d, CLONE_VM | CLONE_FILES);
200         if (rc < 0) {
201                 CERROR("cannot start thread: %d\n", rc);
202                 OBD_FREE(pinger_thread, sizeof(*pinger_thread));
203                 RETURN(rc);
204         }
205         l_wait_event(pinger_thread->t_ctl_waitq,
206                      pinger_thread->t_flags & SVC_RUNNING, &lwi);
207
208         RETURN(rc);
209 }
210
211 int ptlrpc_stop_pinger(void)
212 {
213         struct l_wait_info lwi = { 0 };
214         int rc = 0;
215 #ifndef ENABLE_PINGER
216         return 0;
217 #endif
218         ENTRY;
219
220         if (pinger_thread == NULL)
221                 RETURN(-EALREADY);
222         down(&pinger_sem);
223         pinger_thread->t_flags = SVC_STOPPING;
224         wake_up(&pinger_thread->t_ctl_waitq);
225         up(&pinger_sem);
226
227         l_wait_event(pinger_thread->t_ctl_waitq,
228                      (pinger_thread->t_flags & SVC_STOPPED), &lwi);
229
230         OBD_FREE(pinger_thread, sizeof(*pinger_thread));
231         pinger_thread = NULL;
232         RETURN(rc);
233 }
234
235 void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
236 {
237         down(&pinger_sem);
238         imp->imp_next_ping = jiffies + (obd_timeout * HZ);
239         up(&pinger_sem);
240 }
241
242 int ptlrpc_pinger_add_import(struct obd_import *imp)
243 {
244         ENTRY;
245         if (!list_empty(&imp->imp_pinger_chain))
246                 RETURN(-EALREADY);
247
248         down(&pinger_sem);
249         CDEBUG(D_HA, "adding pingable import %s->%s\n",
250                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
251         imp->imp_next_ping = jiffies + (obd_timeout * HZ);
252         /* XXX sort, blah blah */
253         list_add_tail(&imp->imp_pinger_chain, &pinger_imports);
254         class_import_get(imp);
255
256         ptlrpc_pinger_wake_up();
257         up(&pinger_sem);
258
259         RETURN(0);
260 }
261
262 int ptlrpc_pinger_del_import(struct obd_import *imp)
263 {
264         ENTRY;
265         if (list_empty(&imp->imp_pinger_chain))
266                 RETURN(-ENOENT);
267
268         down(&pinger_sem);
269         list_del_init(&imp->imp_pinger_chain);
270         CDEBUG(D_HA, "removing pingable import %s->%s\n",
271                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
272         class_import_put(imp);
273         up(&pinger_sem);
274         RETURN(0);
275 }
276
277 void ptlrpc_pinger_wake_up()
278 {
279 #ifdef ENABLE_PINGER
280         pinger_thread->t_flags |= SVC_EVENT;
281         wake_up(&pinger_thread->t_ctl_waitq);
282 #endif
283 }
284
285 #else /* !__KERNEL__ */
286
287 /* XXX
288  * the current implementation of pinger in liblustre is not optimized
289  */
290
291 static struct pinger_data {
292         int             pd_recursion;
293         unsigned long   pd_this_ping;
294         unsigned long   pd_next_ping;
295         struct ptlrpc_request_set *pd_set;
296 } pinger_args;
297
298 static int pinger_check_rpcs(void *arg)
299 {
300         unsigned long curtime = time(NULL);
301         struct ptlrpc_request *req;
302         struct ptlrpc_request_set *set;
303         struct list_head *iter;
304         struct pinger_data *pd = &pinger_args;
305         int rc;
306
307         /* prevent recursion */
308         if (pd->pd_recursion++) {
309                 CDEBUG(D_HA, "pinger: recursion! quit\n");
310                 LASSERT(pd->pd_set);
311                 pd->pd_recursion--;
312                 return 0;
313         }
314
315         /* have we reached ping point? */
316         if (!pd->pd_set && pd->pd_next_ping > curtime) {
317                 pd->pd_recursion--;
318                 return 0;
319         }
320
321         /* if we have rpc_set already, continue processing it */
322         if (pd->pd_set) {
323                 LASSERT(pd->pd_this_ping);
324                 set = pd->pd_set;
325                 goto do_check_set;
326         }
327
328         pd->pd_this_ping = curtime;
329         pd->pd_set = ptlrpc_prep_set();
330         set = pd->pd_set;
331
332         /* add rpcs into set */
333         down(&pinger_sem);
334         list_for_each(iter, &pinger_imports) {
335                 struct obd_import *imp =
336                         list_entry(iter, struct obd_import,
337                                    imp_pinger_chain);
338                 int generation, level;
339                 unsigned long flags;
340
341                 if (imp->imp_next_ping <= pd->pd_this_ping) {
342                         /* Add a ping. */
343                         spin_lock_irqsave(&imp->imp_lock, flags);
344                         generation = imp->imp_generation;
345                         level = imp->imp_state;
346                         spin_unlock_irqrestore(&imp->imp_lock, flags);
347
348                         if (level != LUSTRE_IMP_FULL) {
349                                 CDEBUG(D_HA,
350                                        "not pinging %s (in recovery)\n",
351                                        imp->imp_target_uuid.uuid);
352                                 continue;
353                         }
354
355                         req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL,
356                                               NULL);
357                         if (!req) {
358                                 CERROR("out of memory\n");
359                                 break;
360                         }
361                         req->rq_no_resend = 1;
362                         req->rq_replen = lustre_msg_size(0, NULL);
363                         req->rq_send_state = LUSTRE_IMP_FULL;
364                         req->rq_phase = RQ_PHASE_RPC;
365                         req->rq_import_generation = generation;
366                         ptlrpc_set_add_req(set, req);
367                 } else {
368                         CDEBUG(D_HA, "don't need to ping %s (%lu > "
369                                "%lu)\n", imp->imp_target_uuid.uuid,
370                                imp->imp_next_ping, pd->pd_this_ping);
371                 }
372         }
373         pd->pd_this_ping = curtime;
374         up(&pinger_sem);
375
376         /* Might be empty, that's OK. */
377         if (set->set_remaining == 0)
378                 CDEBUG(D_HA, "nothing to ping\n");
379
380         list_for_each(iter, &set->set_requests) {
381                 struct ptlrpc_request *req =
382                         list_entry(iter, struct ptlrpc_request,
383                                    rq_set_chain);
384                 DEBUG_REQ(D_HA, req, "pinging %s->%s",
385                           req->rq_import->imp_obd->obd_uuid.uuid,
386                           req->rq_import->imp_target_uuid.uuid);
387                 (void)ptl_send_rpc(req);
388         }
389
390 do_check_set:
391         rc = ptlrpc_check_set(set);
392
393         /* not finished, and we are not expired, simply return */
394         if (!rc && curtime < pd->pd_this_ping + obd_timeout) {
395                 CDEBUG(D_HA, "not finished, but also not expired\n");
396                 pd->pd_recursion--;
397                 return 0;
398         }
399
400         /* Expire all the requests that didn't come back. */
401         down(&pinger_sem);
402         list_for_each(iter, &set->set_requests) {
403                 req = list_entry(iter, struct ptlrpc_request,
404                                  rq_set_chain);
405
406                 if (req->rq_replied)
407                         continue;
408
409                 req->rq_phase = RQ_PHASE_COMPLETE;
410                 set->set_remaining--;
411                 /* If it was disconnected, don't sweat it. */
412                 if (list_empty(&req->rq_import->imp_pinger_chain)) {
413                         ptlrpc_unregister_reply(req);
414                         continue;
415                 }
416
417                 CDEBUG(D_HA, "pinger initiate expire_one_request\n");
418                 ptlrpc_expire_one_request(req);
419         }
420         up(&pinger_sem);
421
422         ptlrpc_set_destroy(set);
423         pd->pd_set = NULL;
424
425         pd->pd_next_ping = pd->pd_this_ping + obd_timeout;
426         pd->pd_this_ping = 0; /* XXX for debug */
427
428         CDEBUG(D_HA, "finished a round ping\n");
429         pd->pd_recursion--;
430         return 0;
431 }
432
433 static void *pinger_callback = NULL;
434
435 int ptlrpc_start_pinger(void)
436 {
437         memset(&pinger_args, 0, sizeof(pinger_args));
438 #ifdef ENABLE_PINGER
439         pinger_callback =
440                 liblustre_register_wait_callback(&pinger_check_rpcs, &pinger_args);
441 #endif
442         return 0;
443 }
444
445 int ptlrpc_stop_pinger(void)
446 {
447 #ifdef ENABLE_PINGER
448         if (pinger_callback)
449                 liblustre_deregister_wait_callback(pinger_callback);
450 #endif
451         return 0;
452 }
453
454 void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
455 {
456         down(&pinger_sem);
457         imp->imp_next_ping = time(NULL) + obd_timeout;
458         if (pinger_args.pd_set == NULL &&
459             pinger_args.pd_next_ping > imp->imp_next_ping) {
460                 CDEBUG(D_HA, "set next ping to %ld(cur %ld)\n",
461                         imp->imp_next_ping, time(NULL));
462                 pinger_args.pd_next_ping = imp->imp_next_ping;
463         }
464         up(&pinger_sem);
465 }
466
467 int ptlrpc_pinger_add_import(struct obd_import *imp)
468 {
469         ENTRY;
470         if (!list_empty(&imp->imp_pinger_chain))
471                 RETURN(-EALREADY);
472
473         CDEBUG(D_HA, "adding pingable import %s->%s\n",
474                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
475         ptlrpc_pinger_sending_on_import(imp);
476
477         down(&pinger_sem);
478         list_add_tail(&imp->imp_pinger_chain, &pinger_imports);
479         class_import_get(imp);
480         up(&pinger_sem);
481
482         RETURN(0);
483 }
484
485 int ptlrpc_pinger_del_import(struct obd_import *imp)
486 {
487         ENTRY;
488         if (list_empty(&imp->imp_pinger_chain))
489                 RETURN(-ENOENT);
490
491         down(&pinger_sem);
492         list_del_init(&imp->imp_pinger_chain);
493         CDEBUG(D_HA, "removing pingable import %s->%s\n",
494                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
495         class_import_put(imp);
496         up(&pinger_sem);
497         RETURN(0);
498 }
499
500 void ptlrpc_pinger_wake_up()
501 {
502 #ifdef ENABLE_PINGER
503         /* XXX force pinger to run, if needed */
504 #endif
505 }
506 #endif /* !__KERNEL__ */