Whamcloud - gitweb
Merge b_md into HEAD
[fs/lustre-release.git] / lustre / ptlrpc / recovd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  obd/rpc/recovd.c
5  *
6  *  Lustre High Availability Daemon
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  *
15  */
16
17 #define DEBUG_SUBSYSTEM S_RPC
18
19 #include <linux/lustre_lite.h>
20 #include <linux/lustre_ha.h>
21 #include <linux/obd_support.h>
22
23 /* dump_connection_list, but shorter for nicer debugging logs */
24 static void d_c_l(struct list_head *head)
25 {
26         struct list_head *tmp;
27
28         list_for_each(tmp, head) {
29                 struct ptlrpc_connection *conn =
30                         list_entry(tmp, struct ptlrpc_connection,
31                                    c_recovd_data.rd_managed_chain);
32                 CDEBUG(D_HA, "   %p = %s (%d/%d)\n", conn, 
33                        conn->c_remote_uuid.uuid,
34                        conn->c_recovd_data.rd_phase,
35                        conn->c_recovd_data.rd_next_phase);
36         }
37 }
38
39 static void dump_lists(struct recovd_obd *recovd)
40 {
41         CDEBUG(D_HA, "managed: \n");
42         d_c_l(&recovd->recovd_managed_items);
43         CDEBUG(D_HA, "troubled: \n");
44         d_c_l(&recovd->recovd_troubled_items);
45 }
46
47 void recovd_conn_manage(struct ptlrpc_connection *conn,
48                         struct recovd_obd *recovd, ptlrpc_recovery_cb_t recover)
49 {
50         struct recovd_data *rd = &conn->c_recovd_data;
51         ENTRY;
52         if (!recovd || !recover) {
53                 EXIT;
54                 return;
55         }
56
57         if (!list_empty(&rd->rd_managed_chain)) {
58                 if (rd->rd_recovd == recovd && rd->rd_recover == recover) {
59                         CDEBUG(D_HA, "conn %p/%s already setup for recovery\n",
60                                conn, conn->c_remote_uuid.uuid);
61                         EXIT;
62                         return;
63                 }
64                 CDEBUG(D_HA,
65                        "conn %p/%s has recovery items %p/%p, making %p/%p\n",
66                        conn, conn->c_remote_uuid.uuid, rd->rd_recovd, rd->rd_recover,
67                        recovd, recover);
68                 spin_lock(&rd->rd_recovd->recovd_lock);
69                 list_del_init(&rd->rd_managed_chain);
70                 spin_unlock(&rd->rd_recovd->recovd_lock);
71         }
72
73         rd->rd_recovd = recovd;
74         rd->rd_recover = recover;
75         rd->rd_phase = RD_IDLE;
76         rd->rd_next_phase = RD_TROUBLED;
77
78         spin_lock(&recovd->recovd_lock);
79         list_add(&rd->rd_managed_chain, &recovd->recovd_managed_items);
80         dump_lists(recovd);
81         spin_unlock(&recovd->recovd_lock);
82
83         EXIT;
84 }
85
86 void recovd_conn_unmanage(struct ptlrpc_connection *conn)
87 {
88         struct recovd_data *rd = &conn->c_recovd_data;
89         struct recovd_obd *recovd = rd->rd_recovd;
90         ENTRY;
91
92         if (recovd) {
93                 spin_lock(&recovd->recovd_lock);
94                 list_del_init(&rd->rd_managed_chain);
95                 rd->rd_recovd = NULL;
96                 spin_unlock(&recovd->recovd_lock);
97         }
98         /* should be safe enough, right? */
99         rd->rd_recover = NULL;
100         rd->rd_next_phase = RD_IDLE;
101         rd->rd_next_phase = RD_TROUBLED;
102 }
103
104 void recovd_conn_fail(struct ptlrpc_connection *conn)
105 {
106         struct recovd_data *rd = &conn->c_recovd_data;
107         struct recovd_obd *recovd = rd->rd_recovd;
108         ENTRY;
109
110         if (!recovd) {
111                 CERROR("no recovd for connection %p\n", conn);
112                 EXIT;
113                 return;
114         }
115
116         spin_lock(&recovd->recovd_lock);
117         if (rd->rd_phase == RD_TROUBLED || rd->rd_phase == RD_PREPARING) {
118                 CDEBUG(D_HA, "connection %p to %s already in recovery\n",
119                        conn, conn->c_remote_uuid.uuid);
120                 spin_unlock(&recovd->recovd_lock);
121                 EXIT;
122                 return;
123         }
124
125         CERROR("connection %p to %s (%08x %08lx %08lx) failed\n", conn,
126                conn->c_remote_uuid.uuid, conn->c_peer.peer_nid,
127                conn->c_peer.peer_ni.nal_idx, conn->c_peer.peer_ni.handle_idx);
128         list_del(&rd->rd_managed_chain);
129         list_add_tail(&rd->rd_managed_chain, &recovd->recovd_troubled_items);
130         if (rd->rd_phase != RD_IDLE) {
131                 CDEBUG(D_HA,
132                        "connection %p to %s failed in recovery: restarting\n",
133                        conn, conn->c_remote_uuid.uuid);
134                 /* XXX call callback with PHASE_FAILED? */
135                 rd->rd_next_phase = RD_TROUBLED;
136         }
137         rd->rd_phase = RD_TROUBLED;
138         dump_lists(recovd);
139         spin_unlock(&recovd->recovd_lock);
140
141         wake_up(&recovd->recovd_waitq);
142
143         EXIT;
144 }
145
146 void recovd_conn_fixed(struct ptlrpc_connection *conn)
147 {
148         struct recovd_data *rd = &conn->c_recovd_data;
149         ENTRY;
150
151         CDEBUG(D_HA, "connection %p (now to %s) fixed\n",
152                conn, conn->c_remote_uuid.uuid);
153         spin_lock(&rd->rd_recovd->recovd_lock);
154         list_del(&rd->rd_managed_chain);
155         rd->rd_phase = RD_IDLE;
156         rd->rd_next_phase = RD_TROUBLED;
157         list_add(&rd->rd_managed_chain, &rd->rd_recovd->recovd_managed_items);
158         dump_lists(rd->rd_recovd);
159         spin_unlock(&rd->rd_recovd->recovd_lock);
160
161         EXIT;
162 }
163
164 static int recovd_check_event(struct recovd_obd *recovd)
165 {
166         int rc = 0;
167         struct list_head *tmp;
168
169         ENTRY;
170
171         spin_lock(&recovd->recovd_lock);
172
173         if (recovd->recovd_state == RECOVD_STOPPING)
174                 GOTO(out, rc = 1);
175
176         list_for_each(tmp, &recovd->recovd_troubled_items) {
177
178                 struct recovd_data *rd = list_entry(tmp, struct recovd_data,
179                                                     rd_managed_chain);
180
181                 if (rd->rd_phase == rd->rd_next_phase ||
182                     rd->rd_phase == RD_FAILED)
183                         GOTO(out, rc = 1);
184         }
185
186  out:
187         spin_unlock(&recovd->recovd_lock);
188         RETURN(rc);
189 }
190
191 static int recovd_handle_event(struct recovd_obd *recovd)
192 {
193         struct list_head *tmp, *n;
194         int rc = 0;
195         ENTRY;
196
197         spin_lock(&recovd->recovd_lock);
198
199         dump_lists(recovd);
200
201         /*
202          * We use _safe here because one of the callbacks, expecially
203          * FAILURE or PREPARED, could move list items around.
204          */
205         list_for_each_safe(tmp, n, &recovd->recovd_troubled_items) {
206                 struct recovd_data *rd = list_entry(tmp, struct recovd_data,
207                                                     rd_managed_chain);
208
209                 if (rd->rd_phase != RD_FAILED &&
210                     rd->rd_phase != rd->rd_next_phase)
211                         continue;
212
213                 switch (rd->rd_phase) {
214                     case RD_FAILED:
215                 cb_failed: /* must always reach here with recovd_lock held! */
216                         CERROR("recovery FAILED for rd %p (conn %p): %d\n",
217                                rd, class_rd2conn(rd), rc);
218
219                         spin_unlock(&recovd->recovd_lock);
220                         (void)rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_FAILURE);
221                         spin_lock(&recovd->recovd_lock);
222                         break;
223
224                     case RD_TROUBLED:
225                         if (!rd->rd_recover) {
226                                 CERROR("no rd_recover for rd %p (conn %p)\n",
227                                        rd, class_rd2conn(rd));
228                                 rc = -EINVAL;
229                                 break;
230                         }
231                         CERROR("starting recovery for rd %p (conn %p)\n",
232                                rd, class_rd2conn(rd));
233                         rd->rd_phase = RD_PREPARING;
234                         rd->rd_next_phase = RD_PREPARED;
235
236                         spin_unlock(&recovd->recovd_lock);
237                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_PREPARE);
238                         spin_lock(&recovd->recovd_lock);
239                         if (rc)
240                                 goto cb_failed;
241
242                         break;
243
244                     case RD_PREPARED:
245
246                         CERROR("recovery prepared for rd %p (conn %p)\n",
247                                rd, class_rd2conn(rd));
248                         rd->rd_phase = RD_RECOVERING;
249                         rd->rd_next_phase = RD_RECOVERED;
250
251                         spin_unlock(&recovd->recovd_lock);
252                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_RECOVER);
253                         spin_lock(&recovd->recovd_lock);
254                         if (rc)
255                                 goto cb_failed;
256
257                         break;
258
259                     case RD_RECOVERED:
260                         rd->rd_phase = RD_IDLE;
261                         rd->rd_next_phase = RD_TROUBLED;
262
263                         CERROR("recovery complete for rd %p (conn %p)\n",
264                                rd, class_rd2conn(rd));
265                         break;
266
267                     default:
268                         break;
269                 }
270         }
271         spin_unlock(&recovd->recovd_lock);
272         RETURN(0);
273 }
274
275 static int recovd_main(void *arg)
276 {
277         struct recovd_obd *recovd = (struct recovd_obd *)arg;
278         unsigned long flags;
279         ENTRY;
280
281         lock_kernel();
282         daemonize();
283
284 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)
285         sigfillset(&current->blocked);
286         recalc_sigpending();
287 #else
288         spin_lock_irqsave(&current->sigmask_lock, flags);
289         sigfillset(&current->blocked);
290         recalc_sigpending(current);
291         spin_unlock_irqrestore(&current->sigmask_lock, flags);
292 #endif
293
294         sprintf(current->comm, "lustre_recovd");
295         unlock_kernel();
296
297         /* Signal that the thread is running. */
298         recovd->recovd_thread = current;
299         recovd->recovd_state = RECOVD_READY;
300         wake_up(&recovd->recovd_ctl_waitq);
301
302         /* And now, loop forever on requests. */
303         while (1) {
304                 wait_event(recovd->recovd_waitq, recovd_check_event(recovd));
305                 if (recovd->recovd_state == RECOVD_STOPPING)
306                         break;
307                 recovd_handle_event(recovd);
308         }
309
310         recovd->recovd_thread = NULL;
311         recovd->recovd_state = RECOVD_STOPPED;
312         wake_up(&recovd->recovd_ctl_waitq);
313         CDEBUG(D_HA, "mgr exiting process %d\n", current->pid);
314         RETURN(0);
315 }
316
317 int recovd_setup(struct recovd_obd *recovd)
318 {
319         int rc;
320
321         ENTRY;
322
323         INIT_LIST_HEAD(&recovd->recovd_managed_items);
324         INIT_LIST_HEAD(&recovd->recovd_troubled_items);
325         spin_lock_init(&recovd->recovd_lock);
326
327         init_waitqueue_head(&recovd->recovd_waitq);
328         init_waitqueue_head(&recovd->recovd_recovery_waitq);
329         init_waitqueue_head(&recovd->recovd_ctl_waitq);
330
331         rc = kernel_thread(recovd_main, (void *)recovd,
332                            CLONE_VM | CLONE_FS | CLONE_FILES);
333         if (rc < 0) {
334                 CERROR("cannot start thread\n");
335                 RETURN(-EINVAL);
336         }
337         wait_event(recovd->recovd_ctl_waitq,
338                    recovd->recovd_state == RECOVD_READY);
339
340         ptlrpc_recovd = recovd;
341         class_signal_connection_failure = recovd_conn_fail;
342
343         RETURN(0);
344 }
345
346 int recovd_cleanup(struct recovd_obd *recovd)
347 {
348         ENTRY;
349         spin_lock(&recovd->recovd_lock);
350         recovd->recovd_state = RECOVD_STOPPING;
351         wake_up(&recovd->recovd_waitq);
352         spin_unlock(&recovd->recovd_lock);
353
354         wait_event(recovd->recovd_ctl_waitq,
355                    (recovd->recovd_state == RECOVD_STOPPED));
356         RETURN(0);
357 }
358
359 struct recovd_obd *ptlrpc_recovd;