Whamcloud - gitweb
- Parallel recovery implementation, to match documented design.
[fs/lustre-release.git] / lustre / ptlrpc / recovd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  obd/rpc/recovd.c
5  *
6  *  Lustre High Availability Daemon
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  *
15  */
16
17 #define DEBUG_SUBSYSTEM S_RPC
18
19 #include <linux/lustre_lite.h>
20 #include <linux/lustre_ha.h>
21 #include <linux/obd_support.h>
22
23 void recovd_conn_manage(struct ptlrpc_connection *conn,
24                         struct recovd_obd *recovd, ptlrpc_recovery_cb_t recover)
25 {
26         struct recovd_data *rd = &conn->c_recovd_data;
27         ENTRY;
28
29         rd->rd_recovd = recovd;
30         rd->rd_recover = recover;
31
32         spin_lock(&recovd->recovd_lock);
33         list_add(&rd->rd_managed_chain, &recovd->recovd_managed_items);
34         spin_unlock(&recovd->recovd_lock);
35
36         EXIT;
37 }
38
39 void recovd_conn_fail(struct ptlrpc_connection *conn)
40 {
41         struct recovd_data *rd = &conn->c_recovd_data;
42         struct recovd_obd *recovd = rd->rd_recovd;
43         ENTRY;
44
45         if (!recovd) {
46                 CERROR("no recovd for connection %p\n", conn);
47                 return;
48         }
49
50         CERROR("connection %p to %s failed\n", conn, conn->c_remote_uuid);
51         spin_lock(&recovd->recovd_lock);
52         list_del(&rd->rd_managed_chain);
53         list_add_tail(&rd->rd_managed_chain, &recovd->recovd_troubled_items);
54         spin_unlock(&recovd->recovd_lock);
55
56         wake_up(&recovd->recovd_waitq);
57
58         EXIT;
59 }
60
61 /* this function must be called with conn->c_lock held */
62 void recovd_conn_fixed(struct ptlrpc_connection *conn)
63 {
64         struct recovd_data *rd = &conn->c_recovd_data;
65         ENTRY;
66
67         list_del(&rd->rd_managed_chain);
68         list_add(&rd->rd_managed_chain, &rd->rd_recovd->recovd_managed_items);
69
70         EXIT;
71 }
72
73
74 static int recovd_check_event(struct recovd_obd *recovd)
75 {
76         int rc = 0;
77         struct list_head *tmp;
78
79         ENTRY;
80
81         spin_lock(&recovd->recovd_lock);
82
83         if (recovd->recovd_state == RECOVD_STOPPING)
84                 GOTO(out, rc = 1);
85
86         list_for_each(tmp, &recovd->recovd_troubled_items) {
87
88                 struct recovd_data *rd = list_entry(tmp, struct recovd_data,
89                                                     rd_managed_chain);
90
91                 if (rd->rd_phase == rd->rd_next_phase ||
92                     rd->rd_phase == RECOVD_FAILED)
93                         GOTO(out, rc = 1);
94         }
95
96  out:
97         spin_unlock(&recovd->recovd_lock);
98         RETURN(rc);
99 }
100
101 static void dump_connection_list(struct list_head *head)
102 {
103         struct list_head *tmp;
104
105         list_for_each(tmp, head) {
106                 struct ptlrpc_connection *conn =
107                         list_entry(tmp, struct ptlrpc_connection,
108                                    c_recovd_data.rd_managed_chain);
109                 CDEBUG(D_NET, "   %p = %s\n", conn, conn->c_remote_uuid);
110         }
111 }
112
113 static int recovd_handle_event(struct recovd_obd *recovd)
114 {
115         struct list_head *tmp, *n;
116         int rc = 0;
117         ENTRY;
118
119         spin_lock(&recovd->recovd_lock);
120
121         CDEBUG(D_NET, "managed: \n");
122         dump_connection_list(&recovd->recovd_managed_items);
123         CDEBUG(D_NET, "troubled: \n");
124         dump_connection_list(&recovd->recovd_troubled_items);
125
126         /*
127          * We use _safe here because one of the callbacks, expecially
128          * FAILURE or PREPARED, could move list items around.
129          */
130         list_for_each_safe(tmp, n, &recovd->recovd_troubled_items) {
131                 struct recovd_data *rd = list_entry(tmp, struct recovd_data,
132                                                     rd_managed_chain);
133
134                 if (rd->rd_phase != RECOVD_FAILED &&
135                     rd->rd_phase != rd->rd_next_phase)
136                         continue;
137
138                 switch (rd->rd_phase) {
139                     case RECOVD_FAILED:
140                 cb_failed: /* must always reach here with recovd_lock held! */
141                         CERROR("recovery FAILED for rd %p (conn %p): %d\n",
142                                rd, class_rd2conn(rd), rc);
143                         
144                         spin_unlock(&recovd->recovd_lock);
145                         (void)rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_FAILURE);
146                         spin_lock(&recovd->recovd_lock);
147                         break;
148                         
149                     case RECOVD_IDLE:
150                         if (!rd->rd_recover) {
151                                 CERROR("no rd_recover for rd %p (conn %p)\n",
152                                        rd, class_rd2conn(rd));
153                                 rc = -EINVAL;
154                                 break;
155                         }
156                         CERROR("starting recovery for rd %p (conn %p)\n",
157                                rd, class_rd2conn(rd));
158                         rd->rd_phase = RECOVD_PREPARING;
159                         
160                         spin_unlock(&recovd->recovd_lock);
161                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_PREPARE);
162                         spin_lock(&recovd->recovd_lock);
163                         if (rc)
164                                 goto cb_failed;
165                         
166                         rd->rd_next_phase = RECOVD_PREPARED;
167                         break;
168                         
169                     case RECOVD_PREPARED:
170                         rd->rd_phase = RECOVD_RECOVERING;
171                         
172                         CERROR("recovery prepared for rd %p (conn %p)\n",
173                                rd, class_rd2conn(rd));
174                         
175                         spin_unlock(&recovd->recovd_lock);
176                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_RECOVER);
177                         spin_lock(&recovd->recovd_lock);
178                         if (rc)
179                                 goto cb_failed;
180                         
181                         rd->rd_next_phase = RECOVD_RECOVERED;
182                         break;
183                         
184                     case RECOVD_RECOVERED:
185                         rd->rd_phase = RECOVD_IDLE;
186                         rd->rd_next_phase = RECOVD_PREPARING;
187                         
188                         CERROR("recovery complete for rd %p (conn %p)\n",
189                                rd, class_rd2conn(rd));
190                         break;
191                         
192                     default:
193                         break;
194                 }
195         }
196         spin_unlock(&recovd->recovd_lock);
197         RETURN(0);
198 }
199
200 static int recovd_main(void *arg)
201 {
202         struct recovd_obd *recovd = (struct recovd_obd *)arg;
203
204         ENTRY;
205
206         lock_kernel();
207         daemonize();
208         spin_lock_irq(&current->sigmask_lock);
209         sigfillset(&current->blocked);
210         recalc_sigpending(current);
211         spin_unlock_irq(&current->sigmask_lock);
212
213         sprintf(current->comm, "lustre_recovd");
214         unlock_kernel();
215
216         /* Signal that the thread is running. */
217         recovd->recovd_thread = current;
218         recovd->recovd_state = RECOVD_READY;
219         wake_up(&recovd->recovd_ctl_waitq);
220
221         /* And now, loop forever on requests. */
222         while (1) {
223                 wait_event(recovd->recovd_waitq, recovd_check_event(recovd));
224                 if (recovd->recovd_state == RECOVD_STOPPING)
225                         break;
226                 recovd_handle_event(recovd);
227         }
228
229         recovd->recovd_thread = NULL;
230         recovd->recovd_state = RECOVD_STOPPED;
231         wake_up(&recovd->recovd_ctl_waitq);
232         CDEBUG(D_NET, "mgr exiting process %d\n", current->pid);
233         RETURN(0);
234 }
235
236 int recovd_setup(struct recovd_obd *recovd)
237 {
238         int rc;
239         extern void (*class_signal_connection_failure)
240                 (struct ptlrpc_connection *);
241
242         ENTRY;
243
244         INIT_LIST_HEAD(&recovd->recovd_managed_items);
245         INIT_LIST_HEAD(&recovd->recovd_troubled_items);
246         spin_lock_init(&recovd->recovd_lock);
247
248         init_waitqueue_head(&recovd->recovd_waitq);
249         init_waitqueue_head(&recovd->recovd_recovery_waitq);
250         init_waitqueue_head(&recovd->recovd_ctl_waitq);
251
252         rc = kernel_thread(recovd_main, (void *)recovd,
253                            CLONE_VM | CLONE_FS | CLONE_FILES);
254         if (rc < 0) {
255                 CERROR("cannot start thread\n");
256                 RETURN(-EINVAL);
257         }
258         wait_event(recovd->recovd_ctl_waitq,
259                    recovd->recovd_state == RECOVD_READY);
260
261         /* exported and called by obdclass timeout handlers */
262         class_signal_connection_failure = recovd_conn_fail;
263         ptlrpc_recovd = recovd;
264
265         RETURN(0);
266 }
267
268 int recovd_cleanup(struct recovd_obd *recovd)
269 {
270         spin_lock(&recovd->recovd_lock);
271         recovd->recovd_state = RECOVD_STOPPING;
272         wake_up(&recovd->recovd_waitq);
273         spin_unlock(&recovd->recovd_lock);
274
275         wait_event(recovd->recovd_ctl_waitq,
276                    (recovd->recovd_state == RECOVD_STOPPED));
277         RETURN(0);
278 }
279
280 struct recovd_obd *ptlrpc_recovd;