Whamcloud - gitweb
merge b_devel into HEAD (20030703)
[fs/lustre-release.git] / lustre / ptlrpc / pinger.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Portal-RPC reconnection and replay operations, for use in recovery.
5  *
6  *  Copyright (c) 2003 Cluster File Systems, Inc.
7  *   Authors: Phil Schwan <phil@clusterfs.com>
8  *            Mike Shaver <shaver@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include <linux/version.h>
27 #include <asm/semaphore.h>
28
29 #define DEBUG_SUBSYSTEM S_RPC
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include "ptlrpc_internal.h"
33
34 static struct ptlrpc_thread *pinger_thread = NULL;
35 static DECLARE_MUTEX(pinger_sem);
36 static struct list_head pinger_imports = LIST_HEAD_INIT(pinger_imports);
37
38 int ptlrpc_start_pinger(void);
39 int ptlrpc_stop_pinger(void);
40
41 void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
42 {
43         down(&pinger_sem);
44         imp->imp_next_ping = jiffies + (obd_timeout * HZ);
45         up(&pinger_sem);
46 }
47
48 int ptlrpc_pinger_add_import(struct obd_import *imp)
49 {
50         int rc;
51         ENTRY;
52
53 #ifndef ENABLE_PINGER
54         RETURN(0);
55 #else
56         if (!list_empty(&imp->imp_pinger_chain))
57                 RETURN(-EALREADY);
58
59         down(&pinger_sem);
60         if (list_empty(&pinger_imports)) {
61                 up(&pinger_sem);
62                 rc = ptlrpc_start_pinger();
63                 if (rc < 0)
64                         RETURN(rc);
65                 down(&pinger_sem);
66         }
67                 
68         CDEBUG(D_HA, "adding pingable import %s->%s\n",
69                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
70         imp->imp_next_ping = jiffies + (obd_timeout * HZ);
71         list_add_tail(&imp->imp_pinger_chain, &pinger_imports); /* XXX sort, blah blah */
72         class_import_get(imp);
73         up(&pinger_sem);
74         RETURN(0);
75 #endif
76 }
77
78 int ptlrpc_pinger_del_import(struct obd_import *imp)
79 {
80         int rc;
81         ENTRY;
82
83 #ifndef ENABLE_PINGER
84         RETURN(0);
85 #else
86         if (list_empty(&imp->imp_pinger_chain))
87                 RETURN(-ENOENT);
88
89         down(&pinger_sem);
90         list_del_init(&imp->imp_pinger_chain);
91         CDEBUG(D_HA, "removing pingable import %s->%s\n",
92                imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
93         class_import_put(imp);
94         if (list_empty(&pinger_imports)) {
95                 up(&pinger_sem);
96                 rc = ptlrpc_stop_pinger();
97                 if (rc)
98                         RETURN(rc);
99                 down(&pinger_sem);
100         }
101         up(&pinger_sem);
102         RETURN(0);
103 #endif
104 }
105
106 static int ptlrpc_pinger_main(void *arg)
107 {
108         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
109         struct ptlrpc_thread *thread = data->thread;
110         unsigned long flags;
111         ENTRY;
112
113         lock_kernel();
114         ptlrpc_daemonize();
115
116         SIGNAL_MASK_LOCK(current, flags);
117         sigfillset(&current->blocked);
118         RECALC_SIGPENDING;
119         SIGNAL_MASK_UNLOCK(current, flags);
120
121 #if defined(__arch_um__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,4,20))
122         sprintf(current->comm, "%s|%d", data->name,current->thread.extern_pid);
123 #elif defined(__arch_um__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
124         sprintf(current->comm, "%s|%d", data->name,
125                 current->thread.mode.tt.extern_pid);
126 #else
127         strcpy(current->comm, data->name);
128 #endif
129         unlock_kernel();
130
131         /* Record that the thread is running */
132         thread->t_flags = SVC_RUNNING;
133         wake_up(&thread->t_ctl_waitq);
134
135         /* And now, loop forever, pinging as needed. */
136         while (1) {
137                 unsigned long this_ping = jiffies;
138                 long time_to_next_ping;
139                 struct l_wait_info lwi = LWI_TIMEOUT(10 * HZ, NULL, NULL);
140                 struct ptlrpc_request_set *set;
141                 struct ptlrpc_request *req;
142                 struct list_head *iter;
143                 wait_queue_t set_wait;
144                 int rc;
145
146                 set = ptlrpc_prep_set();
147                 down(&pinger_sem);
148                 list_for_each(iter, &pinger_imports) {
149                         struct obd_import *imp =
150                                 list_entry(iter, struct obd_import, imp_pinger_chain);
151                         int generation, level;
152                         unsigned long flags;
153
154                         if (imp->imp_next_ping <= this_ping) {
155                                 /* Add a ping. */
156                                 spin_lock_irqsave(&imp->imp_lock, flags);
157                                 generation = imp->imp_generation;
158                                 level = imp->imp_level;
159                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
160
161                                 if (level != LUSTRE_CONN_FULL) {
162                                         CDEBUG(D_HA, "not pinging %s (in recovery)\n",
163                                                imp->imp_target_uuid.uuid);
164                                         continue;
165                                 }
166
167                                 req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL, NULL);
168                                 if (!req) {
169                                         CERROR("OOM trying to ping\n");
170                                         break;
171                                 }
172                                 req->rq_replen = lustre_msg_size(0, NULL);
173                                 req->rq_level = LUSTRE_CONN_FULL;
174                                 req->rq_phase = RQ_PHASE_RPC;
175                                 req->rq_import_generation = generation;
176                                 ptlrpc_set_add_req(set, req);
177                         } else {
178                                 CDEBUG(D_HA, "don't need to ping %s (%lu > %lu)\n",
179                                        imp->imp_target_uuid.uuid, imp->imp_next_ping,
180                                        this_ping);
181                         }
182                 }
183                 up(&pinger_sem);
184
185                 /* Might be empty, that's OK. */
186                 if (set->set_remaining == 0)
187                         CDEBUG(D_HA, "nothing to ping\n");
188                 list_for_each(iter, &set->set_requests) {
189                         struct ptlrpc_request *req =
190                                 list_entry(iter, struct ptlrpc_request, rq_set_chain);
191                         DEBUG_REQ(D_HA, req, "pinging %s->%s",
192                                   req->rq_import->imp_obd->obd_uuid.uuid,
193                                   req->rq_import->imp_target_uuid.uuid);
194                         (void)ptl_send_rpc(req);
195                 }
196
197                 /* Have to wait on both the thread's queue and the set's. */
198                 init_waitqueue_entry(&set_wait, current);
199                 add_wait_queue(&set->set_waitq, &set_wait);
200                 rc = l_wait_event(thread->t_ctl_waitq,
201                                   thread->t_flags & SVC_STOPPING || ptlrpc_check_set(set),
202                                   &lwi);
203                 remove_wait_queue(&set->set_waitq, &set_wait);
204                 CDEBUG(D_HA, "ping complete (%lu)\n", jiffies);
205
206                 if (thread->t_flags & SVC_STOPPING) {
207                         thread->t_flags &= ~SVC_STOPPING;
208                         list_for_each(iter, &set->set_requests) {
209                                 req = list_entry(iter, struct ptlrpc_request,
210                                                  rq_set_chain);
211                                 if (!req->rq_replied)
212                                         ptlrpc_unregister_reply(req);
213                         }
214                         ptlrpc_set_destroy(set);
215                         EXIT;
216                         break;
217                 }
218
219                 /* Expire all the requests that didn't come back. */
220                 down(&pinger_sem);
221                 list_for_each(iter, &set->set_requests) {
222                         req = list_entry(iter, struct ptlrpc_request, rq_set_chain);
223
224                         if (req->rq_replied)
225                                 continue;
226
227                         req->rq_phase = RQ_PHASE_COMPLETE;
228                         set->set_remaining--;
229                         /* If it was disconnected, don't sweat it. */
230                         if (list_empty(&req->rq_import->imp_pinger_chain))
231                                 continue;
232
233                         ptlrpc_expire_one_request(req);
234                 }
235                 up(&pinger_sem);
236                 ptlrpc_set_destroy(set);
237
238                 /* Wait until the next ping time, or until we're stopped. */
239                 time_to_next_ping = this_ping + (obd_timeout * HZ) - jiffies;
240                 CDEBUG(D_HA, "next ping in %lu (%lu)\n", time_to_next_ping,
241                        this_ping + (obd_timeout * HZ));
242                 if (time_to_next_ping > 0) {
243                         lwi = LWI_TIMEOUT(time_to_next_ping, NULL, NULL);
244                         l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPING,
245                                      &lwi);
246                         if (thread->t_flags & SVC_STOPPING) {
247                                 thread->t_flags &= ~SVC_STOPPING;
248                                 EXIT;
249                                 break;
250                         }
251                 }
252         }
253
254         thread->t_flags = SVC_STOPPED;
255         wake_up(&thread->t_ctl_waitq);
256
257         CDEBUG(D_NET, "pinger thread exiting, process %d\n", current->pid);
258         return 0;
259 }
260
261 int ptlrpc_start_pinger(void)
262 {
263         struct l_wait_info lwi = { 0 };
264         struct ptlrpc_svc_data d;
265         int rc;
266         ENTRY;
267
268         down(&pinger_sem);
269         if (pinger_thread != NULL)
270                 GOTO(out, rc = -EALREADY);
271
272         OBD_ALLOC(pinger_thread, sizeof(*pinger_thread));
273         if (pinger_thread == NULL)
274                 GOTO(out, rc = -ENOMEM);
275         init_waitqueue_head(&pinger_thread->t_ctl_waitq);
276
277         d.name = "ll_ping";
278         d.thread = pinger_thread;
279
280         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
281          * just drop the VM and FILES in ptlrpc_daemonize() right away. */
282         rc = kernel_thread(ptlrpc_pinger_main, &d, CLONE_VM | CLONE_FILES);
283         if (rc < 0) {
284                 CERROR("cannot start thread: %d\n", rc);
285                 OBD_FREE(pinger_thread, sizeof(*pinger_thread));
286                 GOTO(out, rc);
287         }
288         l_wait_event(pinger_thread->t_ctl_waitq,
289                      pinger_thread->t_flags & SVC_RUNNING, &lwi);
290
291  out:
292         up(&pinger_sem);
293         RETURN(rc);
294 }
295
296 int ptlrpc_stop_pinger(void)
297 {
298         struct l_wait_info lwi = { 0 };
299         int rc = 0;
300         ENTRY;
301
302         down(&pinger_sem);
303         if (pinger_thread == NULL)
304                 GOTO(out, rc = -EALREADY);
305
306         pinger_thread->t_flags = SVC_STOPPING;
307         wake_up(&pinger_thread->t_ctl_waitq);
308         l_wait_event(pinger_thread->t_ctl_waitq,
309                      (pinger_thread->t_flags & SVC_STOPPED), &lwi);
310
311         OBD_FREE(pinger_thread, sizeof(*pinger_thread));
312
313  out:
314         up(&pinger_sem);
315         RETURN(rc);
316 }