Whamcloud - gitweb
Land b1_2 onto HEAD (20040304_171022)
[fs/lustre-release.git] / lustre / ptlrpc / pinger.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Portal-RPC reconnection and replay operations, for use in recovery.
5  *
6  *  Copyright (c) 2003 Cluster File Systems, Inc.
7  *   Authors: Phil Schwan <phil@clusterfs.com>
8  *            Mike Shaver <shaver@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef __KERNEL__
27 #include <liblustre.h>
28 #else
29 #include <linux/version.h>
30 #include <asm/semaphore.h>
31 #define DEBUG_SUBSYSTEM S_RPC
32 #endif
33
34 #include <linux/obd_support.h>
35 #include <linux/obd_class.h>
36 #include "ptlrpc_internal.h"
37
38 static DECLARE_MUTEX(pinger_sem);
39 static struct list_head pinger_imports = LIST_HEAD_INIT(pinger_imports);
40
41 #ifdef __KERNEL__
42 static struct ptlrpc_thread *pinger_thread = NULL;
43
44 int ptlrpc_ping(struct obd_import *imp) 
45 {
46         struct ptlrpc_request *req;
47         int rc = 0;
48         ENTRY;
49
50         req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL,
51                               NULL);
52         if (req) {
53                 DEBUG_REQ(D_HA, req, "pinging %s->%s",
54                           imp->imp_obd->obd_uuid.uuid,
55                           imp->imp_target_uuid.uuid);
56                 req->rq_no_resend = req->rq_no_delay = 1;
57                 req->rq_replen = lustre_msg_size(0, 
58                                                  NULL);
59                 ptlrpcd_add_req(req);
60         } else {
61                 CERROR("OOM trying to ping %s->%s\n",
62                           imp->imp_obd->obd_uuid.uuid,
63                           imp->imp_target_uuid.uuid);
64                 rc = -ENOMEM;
65         }
66
67         RETURN(rc);
68 }
69
70 static int ptlrpc_pinger_main(void *arg)
71 {
72         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
73         struct ptlrpc_thread *thread = data->thread;
74         unsigned long flags;
75         ENTRY;
76
77         lock_kernel();
78         ptlrpc_daemonize();
79
80         SIGNAL_MASK_LOCK(current, flags);
81         sigfillset(&current->blocked);
82         RECALC_SIGPENDING;
83         SIGNAL_MASK_UNLOCK(current, flags);
84
85         THREAD_NAME(current->comm, "%s", data->name);
86         unlock_kernel();
87
88         /* Record that the thread is running */
89         thread->t_flags = SVC_RUNNING;
90         wake_up(&thread->t_ctl_waitq);
91
92         /* And now, loop forever, pinging as needed. */
93         while (1) {
94                 unsigned long this_ping = jiffies;
95                 long time_to_next_ping;
96                 struct l_wait_info lwi = LWI_TIMEOUT(obd_timeout * HZ,
97                                                      NULL, NULL);
98                 struct list_head *iter;
99
100                 down(&pinger_sem);
101                 list_for_each(iter, &pinger_imports) {
102                         struct obd_import *imp =
103                                 list_entry(iter, struct obd_import,
104                                            imp_pinger_chain);
105                         int force, level;
106                         unsigned long flags;
107
108
109                         spin_lock_irqsave(&imp->imp_lock, flags);
110                         level = imp->imp_state;
111                         force = imp->imp_force_verify;
112                         if (force)
113                                 imp->imp_force_verify = 0;
114                         spin_unlock_irqrestore(&imp->imp_lock, flags);
115
116                         if (imp->imp_next_ping <= this_ping || force) {
117                                 if (level == LUSTRE_IMP_DISCON) {
118                                         /* wait at least a timeout before 
119                                            trying recovery again. */
120                                         imp->imp_next_ping = jiffies + 
121                                                 (obd_timeout * HZ);
122                                         ptlrpc_initiate_recovery(imp);
123                                 } 
124                                 else if (level != LUSTRE_IMP_FULL ||
125                                          imp->imp_obd->obd_no_recov) {
126                                         CDEBUG(D_HA, 
127                                                "not pinging %s (in recovery "
128                                                " or recovery disabled: %s)\n",
129                                                imp->imp_target_uuid.uuid,
130                                                ptlrpc_import_state_name(level));
131                                 } 
132                                 else if (imp->imp_pingable || force) {
133                                         ptlrpc_ping(imp);
134                                 }
135
136                         } else {
137                                 if (imp->imp_pingable)
138                                         CDEBUG(D_HA, "don't need to ping %s "
139                                                "(%lu > %lu)\n", 
140                                                imp->imp_target_uuid.uuid,
141                                                imp->imp_next_ping, this_ping);
142                         }
143                 }
144                 up(&pinger_sem);
145
146                 /* Wait until the next ping time, or until we're stopped. */
147                 time_to_next_ping = this_ping + (obd_timeout * HZ) - jiffies;
148                 CDEBUG(D_HA, "next ping in %lu (%lu)\n", time_to_next_ping,
149                        this_ping + (obd_timeout * HZ));
150                 if (time_to_next_ping > 0) {
151                         lwi = LWI_TIMEOUT(time_to_next_ping, NULL, NULL);
152                         l_wait_event(thread->t_ctl_waitq,
153                                      thread->t_flags & (SVC_STOPPING|SVC_EVENT),
154                                      &lwi);
155                         if (thread->t_flags & SVC_STOPPING) {
156                                 thread->t_flags &= ~SVC_STOPPING;
157                                 EXIT;
158                                 break;
159                         } else if (thread->t_flags & SVC_EVENT) {
160                                 /* woken after adding import to reset timer */
161                                 thread->t_flags &= ~SVC_EVENT;
162                         }
163                 }
164         }
165
166         thread->t_flags = SVC_STOPPED;
167         wake_up(&thread->t_ctl_waitq);
168
169         CDEBUG(D_NET, "pinger thread exiting, process %d\n", current->pid);
170         return 0;
171 }
172
173 int ptlrpc_start_pinger(void)
174 {
175         struct l_wait_info lwi = { 0 };
176         struct ptlrpc_svc_data d;
177         int rc;
178 #ifndef ENABLE_PINGER
179         return 0;
180 #endif
181         ENTRY;
182
183         if (pinger_thread != NULL)
184                 RETURN(-EALREADY);
185
186         OBD_ALLOC(pinger_thread, sizeof(*pinger_thread));
187         if (pinger_thread == NULL)
188                 RETURN(-ENOMEM);
189         init_waitqueue_head(&pinger_thread->t_ctl_waitq);
190
191         d.name = "ll_ping";
192         d.thread = pinger_thread;
193
194         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
195          * just drop the VM and FILES in ptlrpc_daemonize() right away. */
196         rc = kernel_thread(ptlrpc_pinger_main, &d, CLONE_VM | CLONE_FILES);
197         if (rc < 0) {
198                 CERROR("cannot start thread: %d\n", rc);
199                 OBD_FREE(pinger_thread, sizeof(*pinger_thread));
200                 RETURN(rc);
201         }
202         l_wait_event(pinger_thread->t_ctl_waitq,
203                      pinger_thread->t_flags & SVC_RUNNING, &lwi);
204
205         RETURN(rc);
206 }
207
208 int ptlrpc_stop_pinger(void)
209 {
210         struct l_wait_info lwi = { 0 };
211         int rc = 0;
212 #ifndef ENABLE_PINGER
213         return 0;
214 #endif
215         ENTRY;
216
217         if (pinger_thread == NULL)
218                 RETURN(-EALREADY);
219         down(&pinger_sem);
220         pinger_thread->t_flags = SVC_STOPPING;
221         wake_up(&pinger_thread->t_ctl_waitq);
222         up(&pinger_sem);
223
224         l_wait_event(pinger_thread->t_ctl_waitq,
225                      (pinger_thread->t_flags & SVC_STOPPED), &lwi);
226
227         OBD_FREE(pinger_thread, sizeof(*pinger_thread));
228         pinger_thread = NULL;
229         RETURN(rc);
230 }
231
232 void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
233 {
234         down(&pinger_sem);
235         imp->imp_next_ping = jiffies + (obd_timeout * HZ);
236         up(&pinger_sem);
237 }
238
239 int ptlrpc_pinger_add_import(struct obd_import *imp)
240 {
241         ENTRY;
242         if (!list_empty(&imp->imp_pinger_chain))
243                 RETURN(-EALREADY);
244
245         down(&pinger_sem);
246         CDEBUG(D_HA, "adding pingable import %s->%s\n",
247                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
248         imp->imp_next_ping = jiffies + (obd_timeout * HZ);
249         /* XXX sort, blah blah */
250         list_add_tail(&imp->imp_pinger_chain, &pinger_imports);
251         class_import_get(imp);
252
253         ptlrpc_pinger_wake_up();
254         up(&pinger_sem);
255
256         RETURN(0);
257 }
258
259 int ptlrpc_pinger_del_import(struct obd_import *imp)
260 {
261         ENTRY;
262         if (list_empty(&imp->imp_pinger_chain))
263                 RETURN(-ENOENT);
264
265         down(&pinger_sem);
266         list_del_init(&imp->imp_pinger_chain);
267         CDEBUG(D_HA, "removing pingable import %s->%s\n",
268                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
269         class_import_put(imp);
270         up(&pinger_sem);
271         RETURN(0);
272 }
273
274 void ptlrpc_pinger_wake_up()
275 {
276 #ifdef ENABLE_PINGER
277         pinger_thread->t_flags |= SVC_EVENT;
278         wake_up(&pinger_thread->t_ctl_waitq);
279 #endif
280 }
281
282 #else
283 /* XXX
284  * the current implementation of pinger in liblustre is not optimized
285  */
286
287 static struct pinger_data {
288         int             pd_recursion;
289         unsigned long   pd_this_ping;
290         unsigned long   pd_next_ping;
291         struct ptlrpc_request_set *pd_set;
292 } pinger_args;
293
294 static int pinger_check_rpcs(void *arg)
295 {
296         unsigned long curtime = time(NULL);
297         struct ptlrpc_request *req;
298         struct ptlrpc_request_set *set;
299         struct list_head *iter;
300         struct pinger_data *pd = &pinger_args;
301         int rc;
302
303         /* prevent recursion */
304         if (pd->pd_recursion++) {
305                 CDEBUG(D_HA, "pinger: recursion! quit\n");
306                 LASSERT(pd->pd_set);
307                 pd->pd_recursion--;
308                 return 0;
309         }
310
311         /* have we reached ping point? */
312         if (!pd->pd_set && pd->pd_next_ping > curtime) {
313                 pd->pd_recursion--;
314                 return 0;
315         }
316
317         /* if we have rpc_set already, continue processing it */
318         if (pd->pd_set) {
319                 LASSERT(pd->pd_this_ping);
320                 set = pd->pd_set;
321                 goto do_check_set;
322         }
323
324         pd->pd_this_ping = curtime;
325         pd->pd_set = ptlrpc_prep_set();
326         set = pd->pd_set;
327
328         /* add rpcs into set */
329         down(&pinger_sem);
330         list_for_each(iter, &pinger_imports) {
331                 struct obd_import *imp =
332                         list_entry(iter, struct obd_import,
333                                    imp_pinger_chain);
334                 int generation, level;
335                 unsigned long flags;
336
337                 if (imp->imp_next_ping <= pd->pd_this_ping) {
338                         /* Add a ping. */
339                         spin_lock_irqsave(&imp->imp_lock, flags);
340                         generation = imp->imp_generation;
341                         level = imp->imp_state;
342                         spin_unlock_irqrestore(&imp->imp_lock, flags);
343
344                         if (level != LUSTRE_IMP_FULL) {
345                                 CDEBUG(D_HA,
346                                        "not pinging %s (in recovery)\n",
347                                        imp->imp_target_uuid.uuid);
348                                 continue;
349                         }
350
351                         req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL,
352                                               NULL);
353                         if (!req) {
354                                 CERROR("out of memory\n");
355                                 break;
356                         }
357                         req->rq_no_resend = 1;
358                         req->rq_replen = lustre_msg_size(0, NULL);
359                         req->rq_send_state = LUSTRE_IMP_FULL;
360                         req->rq_phase = RQ_PHASE_RPC;
361                         req->rq_import_generation = generation;
362                         ptlrpc_set_add_req(set, req);
363                 } else {
364                         CDEBUG(D_HA, "don't need to ping %s (%lu > "
365                                "%lu)\n", imp->imp_target_uuid.uuid,
366                                imp->imp_next_ping, pd->pd_this_ping);
367                 }
368         }
369         pd->pd_this_ping = curtime;
370         up(&pinger_sem);
371
372         /* Might be empty, that's OK. */
373         if (set->set_remaining == 0)
374                 CDEBUG(D_HA, "nothing to ping\n");
375
376         list_for_each(iter, &set->set_requests) {
377                 struct ptlrpc_request *req =
378                         list_entry(iter, struct ptlrpc_request,
379                                    rq_set_chain);
380                 DEBUG_REQ(D_HA, req, "pinging %s->%s",
381                           req->rq_import->imp_obd->obd_uuid.uuid,
382                           req->rq_import->imp_target_uuid.uuid);
383                 (void)ptl_send_rpc(req);
384         }
385
386 do_check_set:
387         rc = ptlrpc_check_set(set);
388
389         /* not finished, and we are not expired, simply return */
390         if (!rc && curtime < pd->pd_this_ping + obd_timeout) {
391                 CDEBUG(D_HA, "not finished, but also not expired\n");
392                 pd->pd_recursion--;
393                 return 0;
394         }
395
396         /* Expire all the requests that didn't come back. */
397         down(&pinger_sem);
398         list_for_each(iter, &set->set_requests) {
399                 req = list_entry(iter, struct ptlrpc_request,
400                                  rq_set_chain);
401
402                 if (req->rq_replied)
403                         continue;
404
405                 req->rq_phase = RQ_PHASE_COMPLETE;
406                 set->set_remaining--;
407                 /* If it was disconnected, don't sweat it. */
408                 if (list_empty(&req->rq_import->imp_pinger_chain)) {
409                         ptlrpc_unregister_reply(req);
410                         continue;
411                 }
412
413                 CDEBUG(D_HA, "pinger initiate expire_one_request\n");
414                 ptlrpc_expire_one_request(req);
415         }
416         up(&pinger_sem);
417
418         ptlrpc_set_destroy(set);
419         pd->pd_set = NULL;
420
421         pd->pd_next_ping = pd->pd_this_ping + obd_timeout;
422         pd->pd_this_ping = 0; /* XXX for debug */
423
424         CDEBUG(D_HA, "finished a round ping\n");
425         pd->pd_recursion--;
426         return 0;
427 }
428
429 static void *pinger_callback = NULL;
430
431 int ptlrpc_start_pinger(void)
432 {
433         memset(&pinger_args, 0, sizeof(pinger_args));
434 #ifdef ENABLE_PINGER
435         pinger_callback =
436                 liblustre_register_wait_callback(&pinger_check_rpcs, &pinger_args);
437 #endif
438         return 0;
439 }
440
441 int ptlrpc_stop_pinger(void)
442 {
443 #ifdef ENABLE_PINGER
444         if (pinger_callback)
445                 liblustre_deregister_wait_callback(pinger_callback);
446 #endif
447         return 0;
448 }
449
450 void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
451 {
452         down(&pinger_sem);
453         imp->imp_next_ping = time(NULL) + obd_timeout;
454         if (pinger_args.pd_set == NULL &&
455             pinger_args.pd_next_ping > imp->imp_next_ping) {
456                 CDEBUG(D_HA, "set next ping to %ld(cur %ld)\n",
457                         imp->imp_next_ping, time(NULL));
458                 pinger_args.pd_next_ping = imp->imp_next_ping;
459         }
460         up(&pinger_sem);
461 }
462
463 int ptlrpc_pinger_add_import(struct obd_import *imp)
464 {
465         ENTRY;
466         if (!list_empty(&imp->imp_pinger_chain))
467                 RETURN(-EALREADY);
468
469         CDEBUG(D_HA, "adding pingable import %s->%s\n",
470                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
471         ptlrpc_pinger_sending_on_import(imp);
472
473         down(&pinger_sem);
474         list_add_tail(&imp->imp_pinger_chain, &pinger_imports);
475         class_import_get(imp);
476         up(&pinger_sem);
477
478         RETURN(0);
479 }
480
481 int ptlrpc_pinger_del_import(struct obd_import *imp)
482 {
483         ENTRY;
484         if (list_empty(&imp->imp_pinger_chain))
485                 RETURN(-ENOENT);
486
487         down(&pinger_sem);
488         list_del_init(&imp->imp_pinger_chain);
489         CDEBUG(D_HA, "removing pingable import %s->%s\n",
490                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
491         class_import_put(imp);
492         up(&pinger_sem);
493         RETURN(0);
494 }
495
496 void ptlrpc_pinger_wake_up()
497 {
498 #ifdef ENABLE_PINGER
499         /* XXX force pinger to run, if needed */
500 #endif
501 }
502 #endif /* !__KERNEL__ */