Whamcloud - gitweb
Put back lustre_msg.version (in a different spot, though, so out-of-date clients
[fs/lustre-release.git] / lustre / ptlrpc / recovd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  obd/rpc/recovd.c
5  *
6  *  Lustre High Availability Daemon
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  *
15  */
16
17 #define DEBUG_SUBSYSTEM S_RPC
18
19 #include <linux/lustre_lite.h>
20 #include <linux/lustre_ha.h>
21 #include <linux/obd_support.h>
22
23 /* dump_connection_list, but shorter for nicer debugging logs */
24 static void d_c_l(struct list_head *head)
25 {
26         int sanity = 0;
27         struct list_head *tmp;
28
29         list_for_each(tmp, head) {
30                 struct ptlrpc_connection *conn =
31                         list_entry(tmp, struct ptlrpc_connection,
32                                    c_recovd_data.rd_managed_chain);
33                 CDEBUG(D_HA, "   %p = %s (%d/%d)\n", conn, conn->c_remote_uuid,
34                        conn->c_recovd_data.rd_phase,
35                        conn->c_recovd_data.rd_next_phase);
36                 if (sanity++ > 1000)
37                         LBUG();
38         }
39 }
40
41 static void dump_lists(struct recovd_obd *recovd)
42 {
43         CDEBUG(D_HA, "managed: \n");
44         d_c_l(&recovd->recovd_managed_items);
45         CDEBUG(D_HA, "troubled: \n");
46         d_c_l(&recovd->recovd_troubled_items);
47 }
48
49 void recovd_conn_manage(struct ptlrpc_connection *conn,
50                         struct recovd_obd *recovd, ptlrpc_recovery_cb_t recover)
51 {
52         struct recovd_data *rd = &conn->c_recovd_data;
53         ENTRY;
54         if (!recovd || !recover) {
55                 EXIT;
56                 return;
57         }
58
59         if (!list_empty(&rd->rd_managed_chain)) {
60                 if (rd->rd_recovd == recovd && rd->rd_recover == recover) {
61                         CDEBUG(D_HA, "conn %p/%s already setup for recovery\n",
62                                conn, conn->c_remote_uuid);
63                         EXIT;
64                         return;
65                 }
66                 CDEBUG(D_HA,
67                        "conn %p/%s has recovery items %p/%p, making %p/%p\n",
68                        conn, conn->c_remote_uuid, rd->rd_recovd, rd->rd_recover,
69                        recovd, recover);
70                 spin_lock(&rd->rd_recovd->recovd_lock);
71                 list_del(&rd->rd_managed_chain);
72                 spin_unlock(&rd->rd_recovd->recovd_lock);
73         }
74
75         rd->rd_recovd = recovd;
76         rd->rd_recover = recover;
77         rd->rd_phase = RD_IDLE;
78         rd->rd_next_phase = RD_TROUBLED;
79
80         spin_lock(&recovd->recovd_lock);
81         list_add(&rd->rd_managed_chain, &recovd->recovd_managed_items);
82         dump_lists(recovd);
83         spin_unlock(&recovd->recovd_lock);
84
85         EXIT;
86 }
87
88 void recovd_conn_unmanage(struct ptlrpc_connection *conn)
89 {
90         struct recovd_data *rd = &conn->c_recovd_data;
91         struct recovd_obd *recovd = rd->rd_recovd;
92         ENTRY;
93
94         if (recovd) {
95                 spin_lock(&recovd->recovd_lock);
96                 list_del(&rd->rd_managed_chain);
97                 spin_unlock(&recovd->recovd_lock);
98                 rd->rd_recovd = NULL;
99         }
100         /* should be safe enough, right? */
101         rd->rd_recover = NULL;
102         rd->rd_next_phase = RD_IDLE;
103         rd->rd_next_phase = RD_TROUBLED;
104 }
105
106 void recovd_conn_fail(struct ptlrpc_connection *conn)
107 {
108         struct recovd_data *rd = &conn->c_recovd_data;
109         struct recovd_obd *recovd = rd->rd_recovd;
110         ENTRY;
111
112         if (!recovd) {
113                 CERROR("no recovd for connection %p\n", conn);
114                 EXIT;
115                 return;
116         }
117
118         spin_lock(&recovd->recovd_lock);
119         if (rd->rd_phase != RD_IDLE) {
120                 CERROR("connection %p to %s already in recovery\n",
121                        conn, conn->c_remote_uuid);
122                 /* XXX need to distinguish from failure-in-recovery */
123                 spin_unlock(&recovd->recovd_lock);
124                 EXIT;
125                 return;
126         }
127                 
128         CERROR("connection %p to %s failed\n", conn, conn->c_remote_uuid);
129         list_del(&rd->rd_managed_chain);
130         list_add_tail(&rd->rd_managed_chain, &recovd->recovd_troubled_items);
131         rd->rd_phase = RD_TROUBLED;
132         dump_lists(recovd);
133         spin_unlock(&recovd->recovd_lock);
134
135         wake_up(&recovd->recovd_waitq);
136
137         EXIT;
138 }
139
140 void recovd_conn_fixed(struct ptlrpc_connection *conn)
141 {
142         struct recovd_data *rd = &conn->c_recovd_data;
143         ENTRY;
144
145         CDEBUG(D_HA, "connection %p (now to %s) fixed\n",
146                conn, conn->c_remote_uuid);
147         spin_lock(&rd->rd_recovd->recovd_lock);
148         list_del(&rd->rd_managed_chain);
149         rd->rd_phase = RD_IDLE;
150         rd->rd_next_phase = RD_TROUBLED;
151         list_add(&rd->rd_managed_chain, &rd->rd_recovd->recovd_managed_items);
152         dump_lists(rd->rd_recovd);
153         spin_unlock(&rd->rd_recovd->recovd_lock);
154
155         EXIT;
156 }
157
158 static int recovd_check_event(struct recovd_obd *recovd)
159 {
160         int rc = 0;
161         struct list_head *tmp;
162
163         ENTRY;
164
165         spin_lock(&recovd->recovd_lock);
166
167         if (recovd->recovd_state == RECOVD_STOPPING)
168                 GOTO(out, rc = 1);
169
170         list_for_each(tmp, &recovd->recovd_troubled_items) {
171
172                 struct recovd_data *rd = list_entry(tmp, struct recovd_data,
173                                                     rd_managed_chain);
174
175                 if (rd->rd_phase == rd->rd_next_phase ||
176                     rd->rd_phase == RD_FAILED)
177                         GOTO(out, rc = 1);
178         }
179
180  out:
181         spin_unlock(&recovd->recovd_lock);
182         RETURN(rc);
183 }
184
185 static int recovd_handle_event(struct recovd_obd *recovd)
186 {
187         struct list_head *tmp, *n;
188         int rc = 0;
189         ENTRY;
190
191         spin_lock(&recovd->recovd_lock);
192
193         dump_lists(recovd);
194
195         /*
196          * We use _safe here because one of the callbacks, expecially
197          * FAILURE or PREPARED, could move list items around.
198          */
199         list_for_each_safe(tmp, n, &recovd->recovd_troubled_items) {
200                 struct recovd_data *rd = list_entry(tmp, struct recovd_data,
201                                                     rd_managed_chain);
202
203                 if (rd->rd_phase != RD_FAILED &&
204                     rd->rd_phase != rd->rd_next_phase)
205                         continue;
206
207                 switch (rd->rd_phase) {
208                     case RD_FAILED:
209                 cb_failed: /* must always reach here with recovd_lock held! */
210                         CERROR("recovery FAILED for rd %p (conn %p): %d\n",
211                                rd, class_rd2conn(rd), rc);
212                         
213                         spin_unlock(&recovd->recovd_lock);
214                         (void)rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_FAILURE);
215                         spin_lock(&recovd->recovd_lock);
216                         break;
217                         
218                     case RD_TROUBLED:
219                         if (!rd->rd_recover) {
220                                 CERROR("no rd_recover for rd %p (conn %p)\n",
221                                        rd, class_rd2conn(rd));
222                                 rc = -EINVAL;
223                                 break;
224                         }
225                         CERROR("starting recovery for rd %p (conn %p)\n",
226                                rd, class_rd2conn(rd));
227                         rd->rd_phase = RD_PREPARING;
228                         rd->rd_next_phase = RD_PREPARED;
229                         
230                         spin_unlock(&recovd->recovd_lock);
231                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_PREPARE);
232                         spin_lock(&recovd->recovd_lock);
233                         if (rc)
234                                 goto cb_failed;
235                         
236                         break;
237                         
238                     case RD_PREPARED:
239                         
240                         CERROR("recovery prepared for rd %p (conn %p)\n",
241                                rd, class_rd2conn(rd));
242                         rd->rd_phase = RD_RECOVERING;
243                         rd->rd_next_phase = RD_RECOVERED;
244                         
245                         spin_unlock(&recovd->recovd_lock);
246                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_RECOVER);
247                         spin_lock(&recovd->recovd_lock);
248                         if (rc)
249                                 goto cb_failed;
250                         
251                         break;
252                         
253                     case RD_RECOVERED:
254                         rd->rd_phase = RD_IDLE;
255                         rd->rd_next_phase = RD_TROUBLED;
256                         
257                         CERROR("recovery complete for rd %p (conn %p)\n",
258                                rd, class_rd2conn(rd));
259                         break;
260                         
261                     default:
262                         break;
263                 }
264         }
265         spin_unlock(&recovd->recovd_lock);
266         RETURN(0);
267 }
268
269 static int recovd_main(void *arg)
270 {
271         struct recovd_obd *recovd = (struct recovd_obd *)arg;
272
273         ENTRY;
274
275         lock_kernel();
276         daemonize();
277
278 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)
279         sigfillset(&current->blocked);
280         recalc_sigpending();
281 #else
282         spin_lock_irq(&current->sigmask_lock);
283         sigfillset(&current->blocked);
284         recalc_sigpending(current);
285         spin_unlock_irq(&current->sigmask_lock);
286 #endif
287
288         sprintf(current->comm, "lustre_recovd");
289         unlock_kernel();
290
291         /* Signal that the thread is running. */
292         recovd->recovd_thread = current;
293         recovd->recovd_state = RECOVD_READY;
294         wake_up(&recovd->recovd_ctl_waitq);
295
296         /* And now, loop forever on requests. */
297         while (1) {
298                 wait_event(recovd->recovd_waitq, recovd_check_event(recovd));
299                 if (recovd->recovd_state == RECOVD_STOPPING)
300                         break;
301                 recovd_handle_event(recovd);
302         }
303
304         recovd->recovd_thread = NULL;
305         recovd->recovd_state = RECOVD_STOPPED;
306         wake_up(&recovd->recovd_ctl_waitq);
307         CDEBUG(D_HA, "mgr exiting process %d\n", current->pid);
308         RETURN(0);
309 }
310
311 int recovd_setup(struct recovd_obd *recovd)
312 {
313         int rc;
314
315         ENTRY;
316
317         INIT_LIST_HEAD(&recovd->recovd_managed_items);
318         INIT_LIST_HEAD(&recovd->recovd_troubled_items);
319         spin_lock_init(&recovd->recovd_lock);
320
321         init_waitqueue_head(&recovd->recovd_waitq);
322         init_waitqueue_head(&recovd->recovd_recovery_waitq);
323         init_waitqueue_head(&recovd->recovd_ctl_waitq);
324
325         rc = kernel_thread(recovd_main, (void *)recovd,
326                            CLONE_VM | CLONE_FS | CLONE_FILES);
327         if (rc < 0) {
328                 CERROR("cannot start thread\n");
329                 RETURN(-EINVAL);
330         }
331         wait_event(recovd->recovd_ctl_waitq,
332                    recovd->recovd_state == RECOVD_READY);
333
334         ptlrpc_recovd = recovd;
335         class_signal_connection_failure = recovd_conn_fail;
336
337         RETURN(0);
338 }
339
340 int recovd_cleanup(struct recovd_obd *recovd)
341 {
342         ENTRY;
343         spin_lock(&recovd->recovd_lock);
344         recovd->recovd_state = RECOVD_STOPPING;
345         wake_up(&recovd->recovd_waitq);
346         spin_unlock(&recovd->recovd_lock);
347
348         wait_event(recovd->recovd_ctl_waitq,
349                    (recovd->recovd_state == RECOVD_STOPPED));
350         RETURN(0);
351 }
352
353 struct recovd_obd *ptlrpc_recovd;