Whamcloud - gitweb
- Add some more verbose logging of the cases that get clients into recovery.
[fs/lustre-release.git] / lustre / ptlrpc / recovd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  obd/rpc/recovd.c
5  *
6  *  Lustre High Availability Daemon
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  *
15  */
16
17 #define DEBUG_SUBSYSTEM S_RPC
18
19 #include <linux/lustre_lite.h>
20 #include <linux/lustre_ha.h>
21 #include <linux/obd_support.h>
22
23 void recovd_conn_manage(struct ptlrpc_connection *conn,
24                         struct recovd_obd *recovd, ptlrpc_recovery_cb_t recover)
25 {
26         struct recovd_data *rd = &conn->c_recovd_data;
27         ENTRY;
28
29         rd->rd_recovd = recovd;
30         rd->rd_recover = recover;
31         rd->rd_phase = RD_IDLE;
32         rd->rd_next_phase = RD_TROUBLED;
33
34         spin_lock(&recovd->recovd_lock);
35         list_add(&rd->rd_managed_chain, &recovd->recovd_managed_items);
36         spin_unlock(&recovd->recovd_lock);
37
38         EXIT;
39 }
40
41 void recovd_conn_fail(struct ptlrpc_connection *conn)
42 {
43         struct recovd_data *rd = &conn->c_recovd_data;
44         struct recovd_obd *recovd = rd->rd_recovd;
45         ENTRY;
46
47         if (!recovd) {
48                 CERROR("no recovd for connection %p\n", conn);
49                 EXIT;
50                 return;
51         }
52
53
54         spin_lock(&recovd->recovd_lock);
55         if (rd->rd_phase != RD_IDLE) {
56                 CERROR("connection %p to %s already in recovery\n",
57                        conn, conn->c_remote_uuid);
58                 /* XXX need to distinguish from failure-in-recovery */
59                 spin_unlock(&recovd->recovd_lock);
60                 EXIT;
61                 return;
62         }
63                 
64         CERROR("connection %p to %s failed\n", conn, conn->c_remote_uuid);
65         list_del(&rd->rd_managed_chain);
66         list_add_tail(&rd->rd_managed_chain, &recovd->recovd_troubled_items);
67         rd->rd_phase = RD_TROUBLED;
68         spin_unlock(&recovd->recovd_lock);
69
70         wake_up(&recovd->recovd_waitq);
71
72         EXIT;
73 }
74
75 /* this function must be called with recovd->recovd_lock held */
76 void recovd_conn_fixed(struct ptlrpc_connection *conn)
77 {
78         struct recovd_data *rd = &conn->c_recovd_data;
79         ENTRY;
80
81         spin_lock(&rd->rd_recovd->recovd_lock);
82         list_del(&rd->rd_managed_chain);
83         rd->rd_phase = RD_IDLE;
84         rd->rd_next_phase = RD_TROUBLED;
85         list_add(&rd->rd_managed_chain, &rd->rd_recovd->recovd_managed_items);
86         spin_unlock(&rd->rd_recovd->recovd_lock);
87
88         EXIT;
89 }
90
91
92 static int recovd_check_event(struct recovd_obd *recovd)
93 {
94         int rc = 0;
95         struct list_head *tmp;
96
97         ENTRY;
98
99         spin_lock(&recovd->recovd_lock);
100
101         if (recovd->recovd_state == RECOVD_STOPPING)
102                 GOTO(out, rc = 1);
103
104         list_for_each(tmp, &recovd->recovd_troubled_items) {
105
106                 struct recovd_data *rd = list_entry(tmp, struct recovd_data,
107                                                     rd_managed_chain);
108
109                 if (rd->rd_phase == rd->rd_next_phase ||
110                     rd->rd_phase == RD_FAILED)
111                         GOTO(out, rc = 1);
112         }
113
114  out:
115         spin_unlock(&recovd->recovd_lock);
116         RETURN(rc);
117 }
118
119 static void dump_connection_list(struct list_head *head)
120 {
121         struct list_head *tmp;
122
123         list_for_each(tmp, head) {
124                 struct ptlrpc_connection *conn =
125                         list_entry(tmp, struct ptlrpc_connection,
126                                    c_recovd_data.rd_managed_chain);
127                 CERROR("   %p = %s (%d/%d)\n", conn, conn->c_remote_uuid,
128                        conn->c_recovd_data.rd_phase,
129                        conn->c_recovd_data.rd_next_phase);
130         }
131 }
132
133 static int recovd_handle_event(struct recovd_obd *recovd)
134 {
135         struct list_head *tmp, *n;
136         int rc = 0;
137         ENTRY;
138
139         spin_lock(&recovd->recovd_lock);
140
141         CERROR("managed: \n");
142         dump_connection_list(&recovd->recovd_managed_items);
143         CERROR("troubled: \n");
144         dump_connection_list(&recovd->recovd_troubled_items);
145
146         /*
147          * We use _safe here because one of the callbacks, expecially
148          * FAILURE or PREPARED, could move list items around.
149          */
150         list_for_each_safe(tmp, n, &recovd->recovd_troubled_items) {
151                 struct recovd_data *rd = list_entry(tmp, struct recovd_data,
152                                                     rd_managed_chain);
153
154                 if (rd->rd_phase != RD_FAILED &&
155                     rd->rd_phase != rd->rd_next_phase)
156                         continue;
157
158                 switch (rd->rd_phase) {
159                     case RD_FAILED:
160                 cb_failed: /* must always reach here with recovd_lock held! */
161                         CERROR("recovery FAILED for rd %p (conn %p): %d\n",
162                                rd, class_rd2conn(rd), rc);
163                         
164                         spin_unlock(&recovd->recovd_lock);
165                         (void)rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_FAILURE);
166                         spin_lock(&recovd->recovd_lock);
167                         break;
168                         
169                     case RD_TROUBLED:
170                         if (!rd->rd_recover) {
171                                 CERROR("no rd_recover for rd %p (conn %p)\n",
172                                        rd, class_rd2conn(rd));
173                                 rc = -EINVAL;
174                                 break;
175                         }
176                         CERROR("starting recovery for rd %p (conn %p)\n",
177                                rd, class_rd2conn(rd));
178                         rd->rd_phase = RD_PREPARING;
179                         
180                         spin_unlock(&recovd->recovd_lock);
181                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_PREPARE);
182                         spin_lock(&recovd->recovd_lock);
183                         if (rc)
184                                 goto cb_failed;
185                         
186                         rd->rd_next_phase = RD_PREPARED;
187                         break;
188                         
189                     case RD_PREPARED:
190                         rd->rd_phase = RD_RECOVERING;
191                         
192                         CERROR("recovery prepared for rd %p (conn %p)\n",
193                                rd, class_rd2conn(rd));
194                         
195                         spin_unlock(&recovd->recovd_lock);
196                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_RECOVER);
197                         spin_lock(&recovd->recovd_lock);
198                         if (rc)
199                                 goto cb_failed;
200                         
201                         rd->rd_next_phase = RD_RECOVERED;
202                         break;
203                         
204                     case RD_RECOVERED:
205                         rd->rd_phase = RD_IDLE;
206                         rd->rd_next_phase = RD_TROUBLED;
207                         
208                         CERROR("recovery complete for rd %p (conn %p)\n",
209                                rd, class_rd2conn(rd));
210                         break;
211                         
212                     default:
213                         break;
214                 }
215         }
216         spin_unlock(&recovd->recovd_lock);
217         RETURN(0);
218 }
219
220 static int recovd_main(void *arg)
221 {
222         struct recovd_obd *recovd = (struct recovd_obd *)arg;
223
224         ENTRY;
225
226         lock_kernel();
227         daemonize();
228         spin_lock_irq(&current->sigmask_lock);
229         sigfillset(&current->blocked);
230         recalc_sigpending(current);
231         spin_unlock_irq(&current->sigmask_lock);
232
233         sprintf(current->comm, "lustre_recovd");
234         unlock_kernel();
235
236         /* Signal that the thread is running. */
237         recovd->recovd_thread = current;
238         recovd->recovd_state = RECOVD_READY;
239         wake_up(&recovd->recovd_ctl_waitq);
240
241         /* And now, loop forever on requests. */
242         while (1) {
243                 wait_event(recovd->recovd_waitq, recovd_check_event(recovd));
244                 if (recovd->recovd_state == RECOVD_STOPPING)
245                         break;
246                 recovd_handle_event(recovd);
247         }
248
249         recovd->recovd_thread = NULL;
250         recovd->recovd_state = RECOVD_STOPPED;
251         wake_up(&recovd->recovd_ctl_waitq);
252         CDEBUG(D_NET, "mgr exiting process %d\n", current->pid);
253         RETURN(0);
254 }
255
256 int recovd_setup(struct recovd_obd *recovd)
257 {
258         int rc;
259
260         ENTRY;
261
262         INIT_LIST_HEAD(&recovd->recovd_managed_items);
263         INIT_LIST_HEAD(&recovd->recovd_troubled_items);
264         spin_lock_init(&recovd->recovd_lock);
265
266         init_waitqueue_head(&recovd->recovd_waitq);
267         init_waitqueue_head(&recovd->recovd_recovery_waitq);
268         init_waitqueue_head(&recovd->recovd_ctl_waitq);
269
270         rc = kernel_thread(recovd_main, (void *)recovd,
271                            CLONE_VM | CLONE_FS | CLONE_FILES);
272         if (rc < 0) {
273                 CERROR("cannot start thread\n");
274                 RETURN(-EINVAL);
275         }
276         wait_event(recovd->recovd_ctl_waitq,
277                    recovd->recovd_state == RECOVD_READY);
278
279         ptlrpc_recovd = recovd;
280
281         RETURN(0);
282 }
283
284 int recovd_cleanup(struct recovd_obd *recovd)
285 {
286         spin_lock(&recovd->recovd_lock);
287         recovd->recovd_state = RECOVD_STOPPING;
288         wake_up(&recovd->recovd_waitq);
289         spin_unlock(&recovd->recovd_lock);
290
291         wait_event(recovd->recovd_ctl_waitq,
292                    (recovd->recovd_state == RECOVD_STOPPED));
293         RETURN(0);
294 }
295
296 struct recovd_obd *ptlrpc_recovd;