Whamcloud - gitweb
- Add RD_TROUBLED state for items that need to start recovery, and rename
[fs/lustre-release.git] / lustre / ptlrpc / recovd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  obd/rpc/recovd.c
5  *
6  *  Lustre High Availability Daemon
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  *
15  */
16
17 #define DEBUG_SUBSYSTEM S_RPC
18
19 #include <linux/lustre_lite.h>
20 #include <linux/lustre_ha.h>
21 #include <linux/obd_support.h>
22
23 void recovd_conn_manage(struct ptlrpc_connection *conn,
24                         struct recovd_obd *recovd, ptlrpc_recovery_cb_t recover)
25 {
26         struct recovd_data *rd = &conn->c_recovd_data;
27         ENTRY;
28
29         rd->rd_recovd = recovd;
30         rd->rd_recover = recover;
31         rd->rd_phase = RD_IDLE;
32         rd->rd_next_phase = RD_TROUBLED;
33
34         spin_lock(&recovd->recovd_lock);
35         list_add(&rd->rd_managed_chain, &recovd->recovd_managed_items);
36         spin_unlock(&recovd->recovd_lock);
37
38         EXIT;
39 }
40
41 void recovd_conn_fail(struct ptlrpc_connection *conn)
42 {
43         struct recovd_data *rd = &conn->c_recovd_data;
44         struct recovd_obd *recovd = rd->rd_recovd;
45         ENTRY;
46
47         if (!recovd) {
48                 CERROR("no recovd for connection %p\n", conn);
49                 EXIT;
50                 return;
51         }
52
53
54         spin_lock(&recovd->recovd_lock);
55         if (rd->rd_phase != RD_IDLE) {
56                 CDEBUG(D_INFO, "connection %p to %s already in recovery\n",
57                        conn, conn->c_remote_uuid);
58                 /* XXX need to distinguish from failure-in-recovery */
59                 spin_unlock(&recovd->recovd_lock);
60                 EXIT;
61                 return;
62         }
63                 
64         CERROR("connection %p to %s failed\n", conn, conn->c_remote_uuid);
65         list_del(&rd->rd_managed_chain);
66         list_add_tail(&rd->rd_managed_chain, &recovd->recovd_troubled_items);
67         rd->rd_phase = RD_TROUBLED;
68         spin_unlock(&recovd->recovd_lock);
69
70         wake_up(&recovd->recovd_waitq);
71
72         EXIT;
73 }
74
75 /* this function must be called with recovd->recovd_lock held */
76 void recovd_conn_fixed(struct ptlrpc_connection *conn)
77 {
78         struct recovd_data *rd = &conn->c_recovd_data;
79         ENTRY;
80
81         spin_lock(&rd->rd_recovd->recovd_lock);
82         list_del(&rd->rd_managed_chain);
83         rd->rd_phase = RD_IDLE;
84         rd->rd_next_phase = RD_TROUBLED;
85         list_add(&rd->rd_managed_chain, &rd->rd_recovd->recovd_managed_items);
86         spin_unlock(&rd->rd_recovd->recovd_lock);
87
88         EXIT;
89 }
90
91
92 static int recovd_check_event(struct recovd_obd *recovd)
93 {
94         int rc = 0;
95         struct list_head *tmp;
96
97         ENTRY;
98
99         spin_lock(&recovd->recovd_lock);
100
101         if (recovd->recovd_state == RECOVD_STOPPING)
102                 GOTO(out, rc = 1);
103
104         list_for_each(tmp, &recovd->recovd_troubled_items) {
105
106                 struct recovd_data *rd = list_entry(tmp, struct recovd_data,
107                                                     rd_managed_chain);
108
109                 if (rd->rd_phase == rd->rd_next_phase ||
110                     rd->rd_phase == RD_FAILED)
111                         GOTO(out, rc = 1);
112         }
113
114  out:
115         spin_unlock(&recovd->recovd_lock);
116         RETURN(rc);
117 }
118
119 static void dump_connection_list(struct list_head *head)
120 {
121         struct list_head *tmp;
122
123         list_for_each(tmp, head) {
124                 struct ptlrpc_connection *conn =
125                         list_entry(tmp, struct ptlrpc_connection,
126                                    c_recovd_data.rd_managed_chain);
127                 CDEBUG(D_NET, "   %p = %s\n", conn, conn->c_remote_uuid);
128         }
129 }
130
131 static int recovd_handle_event(struct recovd_obd *recovd)
132 {
133         struct list_head *tmp, *n;
134         int rc = 0;
135         ENTRY;
136
137         spin_lock(&recovd->recovd_lock);
138
139         CDEBUG(D_NET, "managed: \n");
140         dump_connection_list(&recovd->recovd_managed_items);
141         CDEBUG(D_NET, "troubled: \n");
142         dump_connection_list(&recovd->recovd_troubled_items);
143
144         /*
145          * We use _safe here because one of the callbacks, expecially
146          * FAILURE or PREPARED, could move list items around.
147          */
148         list_for_each_safe(tmp, n, &recovd->recovd_troubled_items) {
149                 struct recovd_data *rd = list_entry(tmp, struct recovd_data,
150                                                     rd_managed_chain);
151
152                 if (rd->rd_phase != RD_FAILED &&
153                     rd->rd_phase != rd->rd_next_phase)
154                         continue;
155
156                 switch (rd->rd_phase) {
157                     case RD_FAILED:
158                 cb_failed: /* must always reach here with recovd_lock held! */
159                         CERROR("recovery FAILED for rd %p (conn %p): %d\n",
160                                rd, class_rd2conn(rd), rc);
161                         
162                         spin_unlock(&recovd->recovd_lock);
163                         (void)rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_FAILURE);
164                         spin_lock(&recovd->recovd_lock);
165                         break;
166                         
167                     case RD_TROUBLED:
168                         if (!rd->rd_recover) {
169                                 CERROR("no rd_recover for rd %p (conn %p)\n",
170                                        rd, class_rd2conn(rd));
171                                 rc = -EINVAL;
172                                 break;
173                         }
174                         CERROR("starting recovery for rd %p (conn %p)\n",
175                                rd, class_rd2conn(rd));
176                         rd->rd_phase = RD_PREPARING;
177                         
178                         spin_unlock(&recovd->recovd_lock);
179                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_PREPARE);
180                         spin_lock(&recovd->recovd_lock);
181                         if (rc)
182                                 goto cb_failed;
183                         
184                         rd->rd_next_phase = RD_PREPARED;
185                         break;
186                         
187                     case RD_PREPARED:
188                         rd->rd_phase = RD_RECOVERING;
189                         
190                         CERROR("recovery prepared for rd %p (conn %p)\n",
191                                rd, class_rd2conn(rd));
192                         
193                         spin_unlock(&recovd->recovd_lock);
194                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_RECOVER);
195                         spin_lock(&recovd->recovd_lock);
196                         if (rc)
197                                 goto cb_failed;
198                         
199                         rd->rd_next_phase = RD_RECOVERED;
200                         break;
201                         
202                     case RD_RECOVERED:
203                         rd->rd_phase = RD_IDLE;
204                         rd->rd_next_phase = RD_TROUBLED;
205                         
206                         CERROR("recovery complete for rd %p (conn %p)\n",
207                                rd, class_rd2conn(rd));
208                         break;
209                         
210                     default:
211                         break;
212                 }
213         }
214         spin_unlock(&recovd->recovd_lock);
215         RETURN(0);
216 }
217
218 static int recovd_main(void *arg)
219 {
220         struct recovd_obd *recovd = (struct recovd_obd *)arg;
221
222         ENTRY;
223
224         lock_kernel();
225         daemonize();
226         spin_lock_irq(&current->sigmask_lock);
227         sigfillset(&current->blocked);
228         recalc_sigpending(current);
229         spin_unlock_irq(&current->sigmask_lock);
230
231         sprintf(current->comm, "lustre_recovd");
232         unlock_kernel();
233
234         /* Signal that the thread is running. */
235         recovd->recovd_thread = current;
236         recovd->recovd_state = RECOVD_READY;
237         wake_up(&recovd->recovd_ctl_waitq);
238
239         /* And now, loop forever on requests. */
240         while (1) {
241                 wait_event(recovd->recovd_waitq, recovd_check_event(recovd));
242                 if (recovd->recovd_state == RECOVD_STOPPING)
243                         break;
244                 recovd_handle_event(recovd);
245         }
246
247         recovd->recovd_thread = NULL;
248         recovd->recovd_state = RECOVD_STOPPED;
249         wake_up(&recovd->recovd_ctl_waitq);
250         CDEBUG(D_NET, "mgr exiting process %d\n", current->pid);
251         RETURN(0);
252 }
253
254 int recovd_setup(struct recovd_obd *recovd)
255 {
256         int rc;
257
258         ENTRY;
259
260         INIT_LIST_HEAD(&recovd->recovd_managed_items);
261         INIT_LIST_HEAD(&recovd->recovd_troubled_items);
262         spin_lock_init(&recovd->recovd_lock);
263
264         init_waitqueue_head(&recovd->recovd_waitq);
265         init_waitqueue_head(&recovd->recovd_recovery_waitq);
266         init_waitqueue_head(&recovd->recovd_ctl_waitq);
267
268         rc = kernel_thread(recovd_main, (void *)recovd,
269                            CLONE_VM | CLONE_FS | CLONE_FILES);
270         if (rc < 0) {
271                 CERROR("cannot start thread\n");
272                 RETURN(-EINVAL);
273         }
274         wait_event(recovd->recovd_ctl_waitq,
275                    recovd->recovd_state == RECOVD_READY);
276
277         ptlrpc_recovd = recovd;
278
279         RETURN(0);
280 }
281
282 int recovd_cleanup(struct recovd_obd *recovd)
283 {
284         spin_lock(&recovd->recovd_lock);
285         recovd->recovd_state = RECOVD_STOPPING;
286         wake_up(&recovd->recovd_waitq);
287         spin_unlock(&recovd->recovd_lock);
288
289         wait_event(recovd->recovd_ctl_waitq,
290                    (recovd->recovd_state == RECOVD_STOPPED));
291         RETURN(0);
292 }
293
294 struct recovd_obd *ptlrpc_recovd;