Whamcloud - gitweb
1520cf9aa590c3b45ea6385f896b297adb291b40
[fs/lustre-release.git] / lustre / ptlrpc / recovd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  obd/rpc/recovd.c
5  *
6  *  Lustre High Availability Daemon
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  *
15  */
16
17 #define DEBUG_SUBSYSTEM S_RPC
18
19 #include <linux/lustre_lite.h>
20 #include <linux/lustre_ha.h>
21 #include <linux/obd_support.h>
22
23 /* dump_connection_list, but shorter for nicer debugging logs */
24 static void d_c_l(struct list_head *head)
25 {
26         int sanity = 0;
27         struct list_head *tmp;
28
29         list_for_each(tmp, head) {
30                 struct ptlrpc_connection *conn =
31                         list_entry(tmp, struct ptlrpc_connection,
32                                    c_recovd_data.rd_managed_chain);
33                 CDEBUG(D_HA, "   %p = %s (%d/%d)\n", conn, conn->c_remote_uuid,
34                        conn->c_recovd_data.rd_phase,
35                        conn->c_recovd_data.rd_next_phase);
36                 if (sanity++ > 1000)
37                         LBUG();
38         }
39 }
40
41 static void dump_lists(struct recovd_obd *recovd)
42 {
43         CDEBUG(D_HA, "managed: \n");
44         d_c_l(&recovd->recovd_managed_items);
45         CDEBUG(D_HA, "troubled: \n");
46         d_c_l(&recovd->recovd_troubled_items);
47 }
48
49 void recovd_conn_manage(struct ptlrpc_connection *conn,
50                         struct recovd_obd *recovd, ptlrpc_recovery_cb_t recover)
51 {
52         struct recovd_data *rd = &conn->c_recovd_data;
53         ENTRY;
54         if (!recovd || !recover) {
55                 EXIT;
56                 return;
57         }
58
59         if (!list_empty(&rd->rd_managed_chain)) {
60                 if (rd->rd_recovd == recovd && rd->rd_recover == recover) {
61                         CDEBUG(D_HA, "conn %p/%s already setup for recovery\n",
62                                conn, conn->c_remote_uuid);
63                         EXIT;
64                         return;
65                 }
66                 CDEBUG(D_HA,
67                        "conn %p/%s has recovery items %p/%p, making %p/%p\n",
68                        conn, conn->c_remote_uuid, rd->rd_recovd, rd->rd_recover,
69                        recovd, recover);
70                 spin_lock(&rd->rd_recovd->recovd_lock);
71                 list_del_init(&rd->rd_managed_chain);
72                 spin_unlock(&rd->rd_recovd->recovd_lock);
73         }
74
75         rd->rd_recovd = recovd;
76         rd->rd_recover = recover;
77         rd->rd_phase = RD_IDLE;
78         rd->rd_next_phase = RD_TROUBLED;
79
80         spin_lock(&recovd->recovd_lock);
81         list_add(&rd->rd_managed_chain, &recovd->recovd_managed_items);
82         dump_lists(recovd);
83         spin_unlock(&recovd->recovd_lock);
84
85         EXIT;
86 }
87
88 void recovd_conn_unmanage(struct ptlrpc_connection *conn)
89 {
90         struct recovd_data *rd = &conn->c_recovd_data;
91         struct recovd_obd *recovd = rd->rd_recovd;
92         ENTRY;
93
94         if (recovd) {
95                 spin_lock(&recovd->recovd_lock);
96                 list_del_init(&rd->rd_managed_chain);
97                 rd->rd_recovd = NULL;
98                 spin_unlock(&recovd->recovd_lock);
99         }
100         /* should be safe enough, right? */
101         rd->rd_recover = NULL;
102         rd->rd_next_phase = RD_IDLE;
103         rd->rd_next_phase = RD_TROUBLED;
104 }
105
106 void recovd_conn_fail(struct ptlrpc_connection *conn)
107 {
108         struct recovd_data *rd = &conn->c_recovd_data;
109         struct recovd_obd *recovd = rd->rd_recovd;
110         ENTRY;
111
112         if (!recovd) {
113                 CERROR("no recovd for connection %p\n", conn);
114                 EXIT;
115                 return;
116         }
117
118         spin_lock(&recovd->recovd_lock);
119         if (rd->rd_phase == RD_TROUBLED || rd->rd_phase == RD_PREPARING) {
120                 CDEBUG(D_HA, "connection %p to %s already in recovery\n",
121                        conn, conn->c_remote_uuid);
122                 spin_unlock(&recovd->recovd_lock);
123                 EXIT;
124                 return;
125         }
126
127         CERROR("connection %p to %s failed\n", conn, conn->c_remote_uuid);
128         CERROR("peer is %08x %08lx %08lx\n", conn->c_peer.peer_nid,
129                conn->c_peer.peer_ni.nal_idx, conn->c_peer.peer_ni.handle_idx);
130         list_del(&rd->rd_managed_chain);
131         list_add_tail(&rd->rd_managed_chain, &recovd->recovd_troubled_items);
132         if (rd->rd_phase != RD_IDLE) {
133                 CDEBUG(D_HA,
134                        "connection %p to %s failed in recovery: restarting\n",
135                        conn, conn->c_remote_uuid);
136                 /* XXX call callback with PHASE_FAILED? */
137                 rd->rd_next_phase = RD_TROUBLED;
138         }
139         rd->rd_phase = RD_TROUBLED;
140         dump_lists(recovd);
141         spin_unlock(&recovd->recovd_lock);
142
143         wake_up(&recovd->recovd_waitq);
144
145         EXIT;
146 }
147
148 void recovd_conn_fixed(struct ptlrpc_connection *conn)
149 {
150         struct recovd_data *rd = &conn->c_recovd_data;
151         ENTRY;
152
153         CDEBUG(D_HA, "connection %p (now to %s) fixed\n",
154                conn, conn->c_remote_uuid);
155         spin_lock(&rd->rd_recovd->recovd_lock);
156         list_del(&rd->rd_managed_chain);
157         rd->rd_phase = RD_IDLE;
158         rd->rd_next_phase = RD_TROUBLED;
159         list_add(&rd->rd_managed_chain, &rd->rd_recovd->recovd_managed_items);
160         dump_lists(rd->rd_recovd);
161         spin_unlock(&rd->rd_recovd->recovd_lock);
162
163         EXIT;
164 }
165
166 static int recovd_check_event(struct recovd_obd *recovd)
167 {
168         int rc = 0;
169         struct list_head *tmp;
170
171         ENTRY;
172
173         spin_lock(&recovd->recovd_lock);
174
175         if (recovd->recovd_state == RECOVD_STOPPING)
176                 GOTO(out, rc = 1);
177
178         list_for_each(tmp, &recovd->recovd_troubled_items) {
179
180                 struct recovd_data *rd = list_entry(tmp, struct recovd_data,
181                                                     rd_managed_chain);
182
183                 if (rd->rd_phase == rd->rd_next_phase ||
184                     rd->rd_phase == RD_FAILED)
185                         GOTO(out, rc = 1);
186         }
187
188  out:
189         spin_unlock(&recovd->recovd_lock);
190         RETURN(rc);
191 }
192
193 static int recovd_handle_event(struct recovd_obd *recovd)
194 {
195         struct list_head *tmp, *n;
196         int rc = 0;
197         ENTRY;
198
199         spin_lock(&recovd->recovd_lock);
200
201         dump_lists(recovd);
202
203         /*
204          * We use _safe here because one of the callbacks, expecially
205          * FAILURE or PREPARED, could move list items around.
206          */
207         list_for_each_safe(tmp, n, &recovd->recovd_troubled_items) {
208                 struct recovd_data *rd = list_entry(tmp, struct recovd_data,
209                                                     rd_managed_chain);
210
211                 if (rd->rd_phase != RD_FAILED &&
212                     rd->rd_phase != rd->rd_next_phase)
213                         continue;
214
215                 switch (rd->rd_phase) {
216                     case RD_FAILED:
217                 cb_failed: /* must always reach here with recovd_lock held! */
218                         CERROR("recovery FAILED for rd %p (conn %p): %d\n",
219                                rd, class_rd2conn(rd), rc);
220
221                         spin_unlock(&recovd->recovd_lock);
222                         (void)rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_FAILURE);
223                         spin_lock(&recovd->recovd_lock);
224                         break;
225
226                     case RD_TROUBLED:
227                         if (!rd->rd_recover) {
228                                 CERROR("no rd_recover for rd %p (conn %p)\n",
229                                        rd, class_rd2conn(rd));
230                                 rc = -EINVAL;
231                                 break;
232                         }
233                         CERROR("starting recovery for rd %p (conn %p)\n",
234                                rd, class_rd2conn(rd));
235                         rd->rd_phase = RD_PREPARING;
236                         rd->rd_next_phase = RD_PREPARED;
237
238                         spin_unlock(&recovd->recovd_lock);
239                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_PREPARE);
240                         spin_lock(&recovd->recovd_lock);
241                         if (rc)
242                                 goto cb_failed;
243
244                         break;
245
246                     case RD_PREPARED:
247
248                         CERROR("recovery prepared for rd %p (conn %p)\n",
249                                rd, class_rd2conn(rd));
250                         rd->rd_phase = RD_RECOVERING;
251                         rd->rd_next_phase = RD_RECOVERED;
252
253                         spin_unlock(&recovd->recovd_lock);
254                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_RECOVER);
255                         spin_lock(&recovd->recovd_lock);
256                         if (rc)
257                                 goto cb_failed;
258
259                         break;
260
261                     case RD_RECOVERED:
262                         rd->rd_phase = RD_IDLE;
263                         rd->rd_next_phase = RD_TROUBLED;
264
265                         CERROR("recovery complete for rd %p (conn %p)\n",
266                                rd, class_rd2conn(rd));
267                         break;
268
269                     default:
270                         break;
271                 }
272         }
273         spin_unlock(&recovd->recovd_lock);
274         RETURN(0);
275 }
276
277 static int recovd_main(void *arg)
278 {
279         struct recovd_obd *recovd = (struct recovd_obd *)arg;
280
281         ENTRY;
282
283         lock_kernel();
284         daemonize();
285
286 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)
287         sigfillset(&current->blocked);
288         recalc_sigpending();
289 #else
290         spin_lock_irq(&current->sigmask_lock);
291         sigfillset(&current->blocked);
292         recalc_sigpending(current);
293         spin_unlock_irq(&current->sigmask_lock);
294 #endif
295
296         sprintf(current->comm, "lustre_recovd");
297         unlock_kernel();
298
299         /* Signal that the thread is running. */
300         recovd->recovd_thread = current;
301         recovd->recovd_state = RECOVD_READY;
302         wake_up(&recovd->recovd_ctl_waitq);
303
304         /* And now, loop forever on requests. */
305         while (1) {
306                 wait_event(recovd->recovd_waitq, recovd_check_event(recovd));
307                 if (recovd->recovd_state == RECOVD_STOPPING)
308                         break;
309                 recovd_handle_event(recovd);
310         }
311
312         recovd->recovd_thread = NULL;
313         recovd->recovd_state = RECOVD_STOPPED;
314         wake_up(&recovd->recovd_ctl_waitq);
315         CDEBUG(D_HA, "mgr exiting process %d\n", current->pid);
316         RETURN(0);
317 }
318
319 int recovd_setup(struct recovd_obd *recovd)
320 {
321         int rc;
322
323         ENTRY;
324
325         INIT_LIST_HEAD(&recovd->recovd_managed_items);
326         INIT_LIST_HEAD(&recovd->recovd_troubled_items);
327         spin_lock_init(&recovd->recovd_lock);
328
329         init_waitqueue_head(&recovd->recovd_waitq);
330         init_waitqueue_head(&recovd->recovd_recovery_waitq);
331         init_waitqueue_head(&recovd->recovd_ctl_waitq);
332
333         rc = kernel_thread(recovd_main, (void *)recovd,
334                            CLONE_VM | CLONE_FS | CLONE_FILES);
335         if (rc < 0) {
336                 CERROR("cannot start thread\n");
337                 RETURN(-EINVAL);
338         }
339         wait_event(recovd->recovd_ctl_waitq,
340                    recovd->recovd_state == RECOVD_READY);
341
342         ptlrpc_recovd = recovd;
343         class_signal_connection_failure = recovd_conn_fail;
344
345         RETURN(0);
346 }
347
348 int recovd_cleanup(struct recovd_obd *recovd)
349 {
350         ENTRY;
351         spin_lock(&recovd->recovd_lock);
352         recovd->recovd_state = RECOVD_STOPPING;
353         wake_up(&recovd->recovd_waitq);
354         spin_unlock(&recovd->recovd_lock);
355
356         wait_event(recovd->recovd_ctl_waitq,
357                    (recovd->recovd_state == RECOVD_STOPPED));
358         RETURN(0);
359 }
360
361 struct recovd_obd *ptlrpc_recovd;