Whamcloud - gitweb
file llobdstat.pl was initially added on branch b_devel.
[fs/lustre-release.git] / lustre / ptlrpc / recovd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  obd/rpc/recovd.c
5  *
6  *  Lustre High Availability Daemon
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  *
15  */
16
17 #define DEBUG_SUBSYSTEM S_RPC
18 #ifndef __KERNEL__
19 #include <liblustre.h>
20 #include <linux/obd.h>
21 #include <linux/obd_class.h>
22 #else 
23 #include <linux/lustre_lite.h>
24 #endif
25
26 #include <linux/lustre_ha.h>
27 #include <linux/obd_support.h>
28
29 /* dump_connection_list, but shorter for nicer debugging logs */
30 static void d_c_l(struct list_head *head)
31 {
32         struct list_head *tmp;
33
34         list_for_each(tmp, head) {
35                 struct ptlrpc_connection *conn =
36                         list_entry(tmp, struct ptlrpc_connection,
37                                    c_recovd_data.rd_managed_chain);
38                 CDEBUG(D_HA, "   %p = %s (%d/%d)\n", conn, 
39                        conn->c_remote_uuid.uuid,
40                        conn->c_recovd_data.rd_phase,
41                        conn->c_recovd_data.rd_next_phase);
42         }
43 }
44
45 static void dump_lists(struct recovd_obd *recovd)
46 {
47         CDEBUG(D_HA, "managed: \n");
48         d_c_l(&recovd->recovd_managed_items);
49         CDEBUG(D_HA, "troubled: \n");
50         d_c_l(&recovd->recovd_troubled_items);
51 }
52
53 void recovd_conn_manage(struct ptlrpc_connection *conn,
54                         struct recovd_obd *recovd, ptlrpc_recovery_cb_t recover)
55 {
56         struct recovd_data *rd = &conn->c_recovd_data;
57         ENTRY;
58         if (!recovd || !recover) {
59                 EXIT;
60                 return;
61         }
62
63         if (!list_empty(&rd->rd_managed_chain)) {
64                 if (rd->rd_recovd == recovd && rd->rd_recover == recover) {
65                         CDEBUG(D_HA, "conn %p/%s already setup for recovery\n",
66                                conn, conn->c_remote_uuid.uuid);
67                         EXIT;
68                         return;
69                 }
70                 CDEBUG(D_HA,
71                        "conn %p/%s has recovery items %p/%p, making %p/%p\n",
72                        conn, conn->c_remote_uuid.uuid, rd->rd_recovd, rd->rd_recover,
73                        recovd, recover);
74                 spin_lock(&rd->rd_recovd->recovd_lock);
75                 list_del_init(&rd->rd_managed_chain);
76                 spin_unlock(&rd->rd_recovd->recovd_lock);
77         }
78
79         rd->rd_recovd = recovd;
80         rd->rd_recover = recover;
81         rd->rd_phase = RD_IDLE;
82         rd->rd_next_phase = RD_TROUBLED;
83
84         spin_lock(&recovd->recovd_lock);
85         list_add(&rd->rd_managed_chain, &recovd->recovd_managed_items);
86         dump_lists(recovd);
87         spin_unlock(&recovd->recovd_lock);
88
89         EXIT;
90 }
91
92 void recovd_conn_unmanage(struct ptlrpc_connection *conn)
93 {
94         struct recovd_data *rd = &conn->c_recovd_data;
95         struct recovd_obd *recovd = rd->rd_recovd;
96         ENTRY;
97
98         if (recovd) {
99                 spin_lock(&recovd->recovd_lock);
100                 list_del_init(&rd->rd_managed_chain);
101                 rd->rd_recovd = NULL;
102                 spin_unlock(&recovd->recovd_lock);
103         }
104         /* should be safe enough, right? */
105         rd->rd_recover = NULL;
106         rd->rd_next_phase = RD_IDLE;
107         rd->rd_next_phase = RD_TROUBLED;
108 }
109
110 void recovd_conn_fail(struct ptlrpc_connection *conn)
111 {
112         struct recovd_data *rd = &conn->c_recovd_data;
113         struct recovd_obd *recovd = rd->rd_recovd;
114         ENTRY;
115
116         if (!recovd) {
117                 CERROR("no recovd for connection %p\n", conn);
118                 EXIT;
119                 return;
120         }
121
122         spin_lock(&recovd->recovd_lock);
123         if (rd->rd_phase == RD_TROUBLED || rd->rd_phase == RD_PREPARING) {
124                 CDEBUG(D_HA, "connection %p to %s already in recovery\n",
125                        conn, conn->c_remote_uuid.uuid);
126                 spin_unlock(&recovd->recovd_lock);
127                 EXIT;
128                 return;
129         }
130
131         CERROR("connection %p to %s nid "LPX64" on %s failed\n", conn,
132                conn->c_remote_uuid.uuid, conn->c_peer.peer_nid,
133                conn->c_peer.peer_ni->pni_name);
134         list_del(&rd->rd_managed_chain);
135         list_add_tail(&rd->rd_managed_chain, &recovd->recovd_troubled_items);
136         if (rd->rd_phase != RD_IDLE) {
137                 CDEBUG(D_HA,
138                        "connection %p to %s failed in recovery: restarting\n",
139                        conn, conn->c_remote_uuid.uuid);
140                 /* XXX call callback with PHASE_FAILED? */
141                 rd->rd_next_phase = RD_TROUBLED;
142         }
143         rd->rd_phase = RD_TROUBLED;
144         dump_lists(recovd);
145         spin_unlock(&recovd->recovd_lock);
146
147         wake_up(&recovd->recovd_waitq);
148
149         EXIT;
150 }
151
152 void recovd_conn_fixed(struct ptlrpc_connection *conn)
153 {
154         struct recovd_data *rd = &conn->c_recovd_data;
155         ENTRY;
156
157         CDEBUG(D_HA, "connection %p (now to %s) fixed\n",
158                conn, conn->c_remote_uuid.uuid);
159         spin_lock(&rd->rd_recovd->recovd_lock);
160         list_del(&rd->rd_managed_chain);
161         rd->rd_phase = RD_IDLE;
162         rd->rd_next_phase = RD_TROUBLED;
163         list_add(&rd->rd_managed_chain, &rd->rd_recovd->recovd_managed_items);
164         dump_lists(rd->rd_recovd);
165         spin_unlock(&rd->rd_recovd->recovd_lock);
166
167         EXIT;
168 }
169
170 static int recovd_check_event(struct recovd_obd *recovd)
171 {
172         int rc = 0;
173         struct list_head *tmp;
174
175         ENTRY;
176
177         spin_lock(&recovd->recovd_lock);
178
179         if (recovd->recovd_state == RECOVD_STOPPING)
180                 GOTO(out, rc = 1);
181
182         list_for_each(tmp, &recovd->recovd_troubled_items) {
183
184                 struct recovd_data *rd = list_entry(tmp, struct recovd_data,
185                                                     rd_managed_chain);
186
187                 if (rd->rd_phase == rd->rd_next_phase ||
188                     rd->rd_phase == RD_FAILED)
189                         GOTO(out, rc = 1);
190         }
191
192  out:
193         spin_unlock(&recovd->recovd_lock);
194         RETURN(rc);
195 }
196
197 static int recovd_handle_event(struct recovd_obd *recovd)
198 {
199         struct list_head *tmp, *n;
200         int rc = 0;
201         ENTRY;
202
203         spin_lock(&recovd->recovd_lock);
204
205         dump_lists(recovd);
206
207         /*
208          * We use _safe here because one of the callbacks, expecially
209          * FAILURE or PREPARED, could move list items around.
210          */
211         list_for_each_safe(tmp, n, &recovd->recovd_troubled_items) {
212                 struct recovd_data *rd = list_entry(tmp, struct recovd_data,
213                                                     rd_managed_chain);
214
215                 if (rd->rd_phase != RD_FAILED &&
216                     rd->rd_phase != rd->rd_next_phase)
217                         continue;
218
219                 switch (rd->rd_phase) {
220                     case RD_FAILED:
221                 cb_failed: /* must always reach here with recovd_lock held! */
222                         CERROR("recovery FAILED for rd %p (conn %p): %d\n",
223                                rd, class_rd2conn(rd), rc);
224
225                         spin_unlock(&recovd->recovd_lock);
226                         (void)rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_FAILURE);
227                         spin_lock(&recovd->recovd_lock);
228                         break;
229
230                     case RD_TROUBLED:
231                         if (!rd->rd_recover) {
232                                 CERROR("no rd_recover for rd %p (conn %p)\n",
233                                        rd, class_rd2conn(rd));
234                                 rc = -EINVAL;
235                                 break;
236                         }
237                         CERROR("starting recovery for rd %p (conn %p)\n",
238                                rd, class_rd2conn(rd));
239                         rd->rd_phase = RD_PREPARING;
240                         rd->rd_next_phase = RD_PREPARED;
241
242                         spin_unlock(&recovd->recovd_lock);
243                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_PREPARE);
244                         spin_lock(&recovd->recovd_lock);
245                         if (rc)
246                                 goto cb_failed;
247
248                         break;
249
250                     case RD_PREPARED:
251
252                         CERROR("recovery prepared for rd %p (conn %p)\n",
253                                rd, class_rd2conn(rd));
254                         rd->rd_phase = RD_RECOVERING;
255                         rd->rd_next_phase = RD_RECOVERED;
256
257                         spin_unlock(&recovd->recovd_lock);
258                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_RECOVER);
259                         spin_lock(&recovd->recovd_lock);
260                         if (rc)
261                                 goto cb_failed;
262
263                         break;
264
265                     case RD_RECOVERED:
266                         rd->rd_phase = RD_IDLE;
267                         rd->rd_next_phase = RD_TROUBLED;
268
269                         CERROR("recovery complete for rd %p (conn %p)\n",
270                                rd, class_rd2conn(rd));
271                         break;
272
273                     default:
274                         break;
275                 }
276         }
277         spin_unlock(&recovd->recovd_lock);
278         RETURN(0);
279 }
280
281 #ifdef __KERNEL__
282 static int recovd_main(void *arg)
283 {
284         struct recovd_obd *recovd = (struct recovd_obd *)arg;
285         unsigned long flags;
286         ENTRY;
287
288         lock_kernel();
289         daemonize();
290
291 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)
292         sigfillset(&current->blocked);
293         recalc_sigpending();
294 #else
295         spin_lock_irqsave(&current->sigmask_lock, flags);
296         sigfillset(&current->blocked);
297         recalc_sigpending(current);
298         spin_unlock_irqrestore(&current->sigmask_lock, flags);
299 #endif
300
301         sprintf(current->comm, "lustre_recovd");
302         unlock_kernel();
303
304         /* Signal that the thread is running. */
305         recovd->recovd_thread = current;
306         recovd->recovd_state = RECOVD_READY;
307         wake_up(&recovd->recovd_ctl_waitq);
308
309         /* And now, loop forever on requests. */
310         while (1) {
311                 wait_event(recovd->recovd_waitq, recovd_check_event(recovd));
312                 if (recovd->recovd_state == RECOVD_STOPPING)
313                         break;
314                 recovd_handle_event(recovd);
315         }
316
317         recovd->recovd_thread = NULL;
318         recovd->recovd_state = RECOVD_STOPPED;
319         wake_up(&recovd->recovd_ctl_waitq);
320         CDEBUG(D_HA, "mgr exiting process %d\n", current->pid);
321         RETURN(0);
322 }
323
324 int recovd_setup(struct recovd_obd *recovd)
325 {
326         int rc = 0; /* initialize for Liblustre */
327
328         ENTRY;
329
330         INIT_LIST_HEAD(&recovd->recovd_managed_items);
331         INIT_LIST_HEAD(&recovd->recovd_troubled_items);
332         spin_lock_init(&recovd->recovd_lock);
333
334         init_waitqueue_head(&recovd->recovd_waitq);
335         init_waitqueue_head(&recovd->recovd_recovery_waitq);
336         init_waitqueue_head(&recovd->recovd_ctl_waitq);
337
338         rc = kernel_thread(recovd_main, (void *)recovd,
339                            CLONE_VM | CLONE_FS | CLONE_FILES);
340         if (rc < 0) {
341                 CERROR("cannot start thread\n");
342                 RETURN(-EINVAL);
343         }
344         wait_event(recovd->recovd_ctl_waitq,
345                    recovd->recovd_state == RECOVD_READY);
346
347         ptlrpc_recovd = recovd;
348         class_signal_connection_failure = recovd_conn_fail;
349
350         RETURN(0);
351 }
352 #else 
353 int recovd_setup(struct recovd_obd *recovd)
354 {
355         return 0;
356 }
357 #endif
358
359 int recovd_cleanup(struct recovd_obd *recovd)
360 {
361         ENTRY;
362         spin_lock(&recovd->recovd_lock);
363         recovd->recovd_state = RECOVD_STOPPING;
364         wake_up(&recovd->recovd_waitq);
365         spin_unlock(&recovd->recovd_lock);
366
367         wait_event(recovd->recovd_ctl_waitq,
368                    (recovd->recovd_state == RECOVD_STOPPED));
369         RETURN(0);
370 }
371
372 struct recovd_obd *ptlrpc_recovd;