1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Portal-RPC reconnection and replay operations, for use in recovery.
6 * Copyright (c) 2003 Cluster File Systems, Inc.
7 * Authors: Phil Schwan <phil@clusterfs.com>
8 * Mike Shaver <shaver@clusterfs.com>
10 * This file is part of Lustre, http://www.lustre.org.
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 #include <liblustre.h>
29 #include <linux/version.h>
30 #include <asm/semaphore.h>
31 #define DEBUG_SUBSYSTEM S_RPC
34 #include <linux/obd_support.h>
35 #include <linux/obd_class.h>
36 #include "ptlrpc_internal.h"
38 #define PINGER_RATE 3 /* how many pings we'll do in obd_timeout period */
40 static DECLARE_MUTEX(pinger_sem);
41 static struct list_head pinger_imports = LIST_HEAD_INIT(pinger_imports);
43 static int ptlrpc_ping_interpret(struct ptlrpc_request *req,
46 struct obd_import *imp = req->rq_import;
47 DEBUG_REQ(D_HA, req, "ping reply");
48 if (imp->imp_waiting_ping_reply == 0)
49 DEBUG_REQ(D_ERROR, req, "late ping reply?");
50 if (imp->imp_last_ping_xid != req->rq_xid)
51 DEBUG_REQ(D_ERROR, req, "uh, wrong ping reply on x%lx",
52 imp->imp_last_ping_xid);
54 imp->imp_last_ping_xid = 0;
56 /* if ping reply is an error, don't drop "replied" flag
57 * on import, so pinger will invalidate it */
58 if (ptlrpc_client_replied(req) && req->rq_repmsg->type == PTL_RPC_MSG_ERR)
61 imp->imp_waiting_ping_reply = 0;
65 int ptlrpc_ping(struct obd_import *imp)
67 struct ptlrpc_request *req;
71 req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 0, NULL, NULL);
73 DEBUG_REQ(D_HA, req, "pinging %s->%s",
74 imp->imp_obd->obd_uuid.uuid,
75 imp->imp_target_uuid.uuid);
76 req->rq_no_resend = req->rq_no_delay = 1;
77 req->rq_replen = lustre_msg_size(0, NULL);
78 req->rq_interpret_reply = ptlrpc_ping_interpret;
79 req->rq_timeout = obd_timeout / PINGER_RATE;
80 imp->imp_waiting_ping_reply = 1;
81 imp->imp_last_ping_xid = req->rq_xid;
84 CERROR("OOM trying to ping %s->%s\n",
85 imp->imp_obd->obd_uuid.uuid,
86 imp->imp_target_uuid.uuid);
94 static inline int ptlrpc_next_ping(struct obd_import *imp)
96 return jiffies + (obd_timeout / PINGER_RATE * HZ);
99 static inline int ptlrpc_next_reconnect(struct obd_import *imp)
101 if (imp->imp_server_timeout)
102 return jiffies + (obd_timeout / 2 * HZ);
104 return jiffies + (obd_timeout * HZ);
107 static atomic_t suspend_timeouts = ATOMIC_INIT(0);
108 static wait_queue_head_t suspend_timeouts_waitq;
110 void ptlrpc_deactivate_timeouts(void)
112 CDEBUG(D_HA, "deactivate timeouts\n");
113 atomic_inc(&suspend_timeouts);
116 void ptlrpc_activate_timeouts(void)
118 CDEBUG(D_HA, "activate timeouts\n");
119 LASSERT(atomic_read(&suspend_timeouts) > 0);
120 if (atomic_dec_and_test(&suspend_timeouts))
121 wake_up(&suspend_timeouts_waitq);
124 int ptlrpc_check_suspend(void)
126 if (atomic_read(&suspend_timeouts))
131 int ptlrpc_check_and_wait_suspend(struct ptlrpc_request *req)
133 struct l_wait_info lwi;
135 if (atomic_read(&suspend_timeouts)) {
136 DEBUG_REQ(D_NET, req, "-- suspend %d regular timeout",
137 atomic_read(&suspend_timeouts));
138 lwi = LWI_INTR(NULL, NULL);
139 l_wait_event(suspend_timeouts_waitq,
140 atomic_read(&suspend_timeouts) == 0, &lwi);
141 DEBUG_REQ(D_NET, req, "-- recharge regular timeout");
147 static void ptlrpc_pinger_process_import(struct obd_import *imp,
148 unsigned long this_ping)
153 spin_lock_irqsave(&imp->imp_lock, flags);
154 level = imp->imp_state;
155 force = imp->imp_force_verify;
157 imp->imp_force_verify = 0;
158 spin_unlock_irqrestore(&imp->imp_lock, flags);
160 if (imp->imp_next_ping > this_ping && force == 0)
163 if (level == LUSTRE_IMP_DISCON && !imp->imp_deactive) {
164 /* wait at least a timeout before trying recovery again */
165 imp->imp_next_ping = ptlrpc_next_reconnect(imp);
166 ptlrpc_initiate_recovery(imp);
167 } else if (level != LUSTRE_IMP_FULL || imp->imp_obd->obd_no_recov) {
168 CDEBUG(D_HA, "not pinging %s (in recovery "
169 " or recovery disabled: %s)\n",
170 imp->imp_target_uuid.uuid,
171 ptlrpc_import_state_name(level));
172 } else if (level == LUSTRE_IMP_FULL && imp->imp_waiting_ping_reply &&
173 imp->imp_next_ping >= this_ping && imp->imp_pingable) {
174 CDEBUG(D_HA, "%s: %s hasn't respond on ping x%lu\n",
175 imp->imp_obd->obd_uuid.uuid,
176 imp->imp_target_uuid.uuid, imp->imp_last_ping_xid);
177 CDEBUG(D_ERROR, "%s: %s hasn't respond on ping x%lu\n",
178 imp->imp_obd->obd_uuid.uuid,
179 imp->imp_target_uuid.uuid, imp->imp_last_ping_xid);
180 ptlrpc_fail_import(imp, 0);
181 } else if (imp->imp_pingable || force) {
182 imp->imp_next_ping = ptlrpc_next_ping(imp);
187 static int ptlrpc_pinger_main(void *arg)
189 struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
190 struct ptlrpc_thread *thread = data->thread;
197 SIGNAL_MASK_LOCK(current, flags);
198 sigfillset(¤t->blocked);
200 SIGNAL_MASK_UNLOCK(current, flags);
202 LASSERTF(strlen(data->name) < sizeof(current->comm),
203 "name %d > len %d\n",
204 (int)strlen(data->name), (int)sizeof(current->comm));
205 THREAD_NAME(current->comm, sizeof(current->comm) - 1, "%s", data->name);
208 /* Record that the thread is running */
209 thread->t_flags = SVC_RUNNING;
210 wake_up(&thread->t_ctl_waitq);
212 /* And now, loop forever, pinging as needed. */
214 unsigned long this_ping = jiffies;
215 long time_to_next_ping;
216 struct l_wait_info lwi = LWI_TIMEOUT(obd_timeout * HZ,
218 struct list_head *iter;
220 time_to_next_ping = this_ping + (obd_timeout * HZ) - jiffies;
222 list_for_each(iter, &pinger_imports) {
223 struct obd_import *imp =
224 list_entry(iter, struct obd_import,
227 ptlrpc_pinger_process_import(imp, this_ping);
229 CDEBUG(D_OTHER, "%s: pingable %d, next_ping %lu(%lu)\n",
230 imp->imp_target_uuid.uuid,
231 imp->imp_pingable, imp->imp_next_ping, jiffies);
233 if (imp->imp_pingable && imp->imp_next_ping &&
234 imp->imp_next_ping - jiffies < time_to_next_ping &&
235 imp->imp_next_ping > jiffies)
236 time_to_next_ping = imp->imp_next_ping - jiffies;
240 /* Wait until the next ping time, or until we're stopped. */
241 CDEBUG(D_HA, "next ping in %lu (%lu)\n", time_to_next_ping,
242 this_ping + (obd_timeout * HZ));
243 if (time_to_next_ping > 0) {
244 lwi = LWI_TIMEOUT(time_to_next_ping, NULL, NULL);
245 l_wait_event(thread->t_ctl_waitq,
246 thread->t_flags & (SVC_STOPPING|SVC_EVENT),
248 if (thread->t_flags & SVC_STOPPING) {
249 thread->t_flags &= ~SVC_STOPPING;
252 } else if (thread->t_flags & SVC_EVENT) {
253 /* woken after adding import to reset timer */
254 thread->t_flags &= ~SVC_EVENT;
259 thread->t_flags = SVC_STOPPED;
260 wake_up(&thread->t_ctl_waitq);
262 CDEBUG(D_NET, "pinger thread exiting, process %d\n", current->pid);
266 static struct ptlrpc_thread *pinger_thread = NULL;
268 int ptlrpc_start_pinger(void)
270 struct l_wait_info lwi = { 0 };
271 struct ptlrpc_svc_data d;
273 #ifndef ENABLE_PINGER
278 LASSERT(obd_timeout > PINGER_RATE);
280 if (pinger_thread != NULL)
283 OBD_ALLOC(pinger_thread, sizeof(*pinger_thread));
284 if (pinger_thread == NULL)
286 init_waitqueue_head(&pinger_thread->t_ctl_waitq);
287 init_waitqueue_head(&suspend_timeouts_waitq);
290 d.thread = pinger_thread;
292 /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
293 * just drop the VM and FILES in ptlrpc_daemonize() right away. */
294 rc = kernel_thread(ptlrpc_pinger_main, &d, CLONE_VM | CLONE_FILES);
296 CERROR("cannot start thread: %d\n", rc);
297 OBD_FREE(pinger_thread, sizeof(*pinger_thread));
300 l_wait_event(pinger_thread->t_ctl_waitq,
301 pinger_thread->t_flags & SVC_RUNNING, &lwi);
306 int ptlrpc_stop_pinger(void)
308 struct l_wait_info lwi = { 0 };
310 #ifndef ENABLE_PINGER
315 if (pinger_thread == NULL)
318 pinger_thread->t_flags = SVC_STOPPING;
319 wake_up(&pinger_thread->t_ctl_waitq);
322 l_wait_event(pinger_thread->t_ctl_waitq,
323 (pinger_thread->t_flags & SVC_STOPPED), &lwi);
325 OBD_FREE(pinger_thread, sizeof(*pinger_thread));
326 pinger_thread = NULL;
330 void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
333 imp->imp_next_ping = ptlrpc_next_ping(imp);
337 int ptlrpc_pinger_add_import(struct obd_import *imp)
340 if (!list_empty(&imp->imp_pinger_chain))
344 CDEBUG(D_HA, "adding pingable import %s->%s\n",
345 imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
346 imp->imp_next_ping = jiffies + (obd_timeout * HZ);
347 /* XXX sort, blah blah */
348 list_add_tail(&imp->imp_pinger_chain, &pinger_imports);
349 class_import_get(imp);
351 ptlrpc_pinger_wake_up();
357 int ptlrpc_pinger_del_import(struct obd_import *imp)
360 if (list_empty(&imp->imp_pinger_chain))
364 list_del_init(&imp->imp_pinger_chain);
365 CDEBUG(D_HA, "removing pingable import %s->%s\n",
366 imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
367 class_import_put(imp);
372 void ptlrpc_pinger_wake_up()
375 pinger_thread->t_flags |= SVC_EVENT;
376 wake_up(&pinger_thread->t_ctl_waitq);
380 #else /* !__KERNEL__ */
383 * the current implementation of pinger in liblustre is not optimized
386 static struct pinger_data {
388 unsigned long pd_this_ping;
389 unsigned long pd_next_ping;
393 static int pinger_check_rpcs(void *arg)
395 unsigned long curtime = time(NULL);
396 struct list_head *iter;
397 struct pinger_data *pd = &pinger_args;
399 /* prevent recursion */
400 if (pd->pd_recursion++) {
401 CDEBUG(D_HA, "pinger: recursion! quit\n");
406 /* have we reached ping point? */
407 if (pd->pd_next_ping > curtime && !pd->pd_force_check) {
412 if (pd->pd_force_check)
413 pd->pd_force_check = 0;
415 pd->pd_this_ping = curtime;
417 /* add rpcs into set */
419 list_for_each(iter, &pinger_imports) {
420 struct obd_import *imp =
421 list_entry(iter, struct obd_import,
427 spin_lock_irqsave(&imp->imp_lock, flags);
428 level = imp->imp_state;
429 force = imp->imp_force_verify;
431 imp->imp_force_verify = 0;
432 spin_unlock_irqrestore(&imp->imp_lock, flags);
434 if (imp->imp_next_ping <= pd->pd_this_ping || force) {
435 if (level == LUSTRE_IMP_DISCON) {
436 /* wait at least a timeout before
437 trying recovery again. */
438 unsigned long timeout = obd_timeout;
439 if (imp->imp_server_timeout)
440 timeout = obd_timeout / 2;
441 imp->imp_next_ping = time(NULL) +
443 ptlrpc_initiate_recovery(imp);
445 else if (level != LUSTRE_IMP_FULL ||
446 imp->imp_obd->obd_no_recov) {
448 "not pinging %s (in recovery "
449 " or recovery disabled: %s)\n",
450 imp->imp_target_uuid.uuid,
451 ptlrpc_import_state_name(level));
453 else if (imp->imp_pingable || force) {
458 if (imp->imp_pingable) {
459 CDEBUG(D_HA, "don't need to ping %s "
461 imp->imp_target_uuid.uuid,
462 imp->imp_next_ping, pd->pd_this_ping);
469 pd->pd_next_ping = pd->pd_this_ping + (obd_timeout * HZ);
471 CDEBUG(D_HA, "finished a round ping\n");
476 static void *pinger_callback = NULL;
478 int ptlrpc_start_pinger(void)
480 memset(&pinger_args, 0, sizeof(pinger_args));
483 liblustre_register_wait_callback(&pinger_check_rpcs, &pinger_args);
488 int ptlrpc_stop_pinger(void)
492 liblustre_deregister_wait_callback(pinger_callback);
497 void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
500 imp->imp_next_ping = time(NULL) + obd_timeout;
501 if (pinger_args.pd_next_ping > imp->imp_next_ping) {
502 CDEBUG(D_HA, "set next ping to %ld(cur %ld)\n",
503 imp->imp_next_ping, time(NULL));
504 pinger_args.pd_next_ping = imp->imp_next_ping;
509 int ptlrpc_pinger_add_import(struct obd_import *imp)
512 if (!list_empty(&imp->imp_pinger_chain))
515 CDEBUG(D_HA, "adding pingable import %s->%s\n",
516 imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
517 ptlrpc_pinger_sending_on_import(imp);
520 list_add_tail(&imp->imp_pinger_chain, &pinger_imports);
521 class_import_get(imp);
527 int ptlrpc_pinger_del_import(struct obd_import *imp)
530 if (list_empty(&imp->imp_pinger_chain))
534 list_del_init(&imp->imp_pinger_chain);
535 CDEBUG(D_HA, "removing pingable import %s->%s\n",
536 imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
537 class_import_put(imp);
542 void ptlrpc_pinger_wake_up()
544 pinger_args.pd_force_check = 1;
546 #endif /* !__KERNEL__ */