Whamcloud - gitweb
- Cancel any and all outstanding locks when an export is disconnected.
[fs/lustre-release.git] / lustre / ptlrpc / recovd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  obd/rpc/recovd.c
5  *
6  *  Lustre High Availability Daemon
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  *
15  */
16
17 #define DEBUG_SUBSYSTEM S_RPC
18
19 #include <linux/lustre_lite.h>
20 #include <linux/lustre_ha.h>
21 #include <linux/obd_support.h>
22
23 void recovd_conn_manage(struct ptlrpc_connection *conn,
24                         struct recovd_obd *recovd, ptlrpc_recovery_cb_t recover)
25 {
26         struct recovd_data *rd = &conn->c_recovd_data;
27         ENTRY;
28
29         rd->rd_recovd = recovd;
30         rd->rd_recover = recover;
31
32         spin_lock(&recovd->recovd_lock);
33         list_add(&rd->rd_managed_chain, &recovd->recovd_managed_items);
34         spin_unlock(&recovd->recovd_lock);
35
36         EXIT;
37 }
38
39 void recovd_conn_fail(struct ptlrpc_connection *conn)
40 {
41         struct recovd_data *rd = &conn->c_recovd_data;
42         struct recovd_obd *recovd = rd->rd_recovd;
43         ENTRY;
44
45         if (!recovd) {
46                 CERROR("no recovd for connection %p\n", conn);
47                 EXIT;
48                 return;
49         }
50
51
52         spin_lock(&recovd->recovd_lock);
53         if (rd->rd_phase != RECOVD_IDLE || rd->rd_next_phase != RECOVD_IDLE) {
54                 CDEBUG(D_INFO, "connection %p to %s already in recovery\n",
55                        conn, conn->c_remote_uuid);
56                 spin_unlock(&recovd->recovd_lock);
57                 EXIT;
58                 return;
59         }
60                 
61         CERROR("connection %p to %s failed\n", conn, conn->c_remote_uuid);
62         list_del(&rd->rd_managed_chain);
63         list_add_tail(&rd->rd_managed_chain, &recovd->recovd_troubled_items);
64         rd->rd_next_phase = RECOVD_PREPARING;
65         spin_unlock(&recovd->recovd_lock);
66
67         wake_up(&recovd->recovd_waitq);
68
69         EXIT;
70 }
71
72 /* this function must be called with conn->c_lock held */
73 void recovd_conn_fixed(struct ptlrpc_connection *conn)
74 {
75         struct recovd_data *rd = &conn->c_recovd_data;
76         ENTRY;
77
78         list_del(&rd->rd_managed_chain);
79         list_add(&rd->rd_managed_chain, &rd->rd_recovd->recovd_managed_items);
80
81         EXIT;
82 }
83
84
85 static int recovd_check_event(struct recovd_obd *recovd)
86 {
87         int rc = 0;
88         struct list_head *tmp;
89
90         ENTRY;
91
92         spin_lock(&recovd->recovd_lock);
93
94         if (recovd->recovd_state == RECOVD_STOPPING)
95                 GOTO(out, rc = 1);
96
97         list_for_each(tmp, &recovd->recovd_troubled_items) {
98
99                 struct recovd_data *rd = list_entry(tmp, struct recovd_data,
100                                                     rd_managed_chain);
101
102                 if (rd->rd_phase == rd->rd_next_phase ||
103                     (rd->rd_phase == RECOVD_IDLE && 
104                      rd->rd_next_phase == RECOVD_PREPARING) ||
105                     rd->rd_phase == RECOVD_FAILED)
106                         GOTO(out, rc = 1);
107         }
108
109  out:
110         spin_unlock(&recovd->recovd_lock);
111         RETURN(rc);
112 }
113
114 static void dump_connection_list(struct list_head *head)
115 {
116         struct list_head *tmp;
117
118         list_for_each(tmp, head) {
119                 struct ptlrpc_connection *conn =
120                         list_entry(tmp, struct ptlrpc_connection,
121                                    c_recovd_data.rd_managed_chain);
122                 CDEBUG(D_NET, "   %p = %s\n", conn, conn->c_remote_uuid);
123         }
124 }
125
126 static int recovd_handle_event(struct recovd_obd *recovd)
127 {
128         struct list_head *tmp, *n;
129         int rc = 0;
130         ENTRY;
131
132         spin_lock(&recovd->recovd_lock);
133
134         CDEBUG(D_NET, "managed: \n");
135         dump_connection_list(&recovd->recovd_managed_items);
136         CDEBUG(D_NET, "troubled: \n");
137         dump_connection_list(&recovd->recovd_troubled_items);
138
139         /*
140          * We use _safe here because one of the callbacks, expecially
141          * FAILURE or PREPARED, could move list items around.
142          */
143         list_for_each_safe(tmp, n, &recovd->recovd_troubled_items) {
144                 struct recovd_data *rd = list_entry(tmp, struct recovd_data,
145                                                     rd_managed_chain);
146
147                 /* XXXshaver This is very ugly -- add a RECOVD_TROUBLED state! */
148                 if (rd->rd_phase != RECOVD_FAILED &&
149                     !(rd->rd_phase == RECOVD_IDLE &&
150                       rd->rd_next_phase == RECOVD_PREPARING) &&
151                     rd->rd_phase != rd->rd_next_phase)
152                         continue;
153
154                 switch (rd->rd_phase) {
155                     case RECOVD_FAILED:
156                 cb_failed: /* must always reach here with recovd_lock held! */
157                         CERROR("recovery FAILED for rd %p (conn %p): %d\n",
158                                rd, class_rd2conn(rd), rc);
159                         
160                         spin_unlock(&recovd->recovd_lock);
161                         (void)rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_FAILURE);
162                         spin_lock(&recovd->recovd_lock);
163                         break;
164                         
165                     case RECOVD_IDLE:
166                         if (!rd->rd_recover) {
167                                 CERROR("no rd_recover for rd %p (conn %p)\n",
168                                        rd, class_rd2conn(rd));
169                                 rc = -EINVAL;
170                                 break;
171                         }
172                         CERROR("starting recovery for rd %p (conn %p)\n",
173                                rd, class_rd2conn(rd));
174                         rd->rd_phase = RECOVD_PREPARING;
175                         
176                         spin_unlock(&recovd->recovd_lock);
177                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_PREPARE);
178                         spin_lock(&recovd->recovd_lock);
179                         if (rc)
180                                 goto cb_failed;
181                         
182                         rd->rd_next_phase = RECOVD_PREPARED;
183                         break;
184                         
185                     case RECOVD_PREPARED:
186                         rd->rd_phase = RECOVD_RECOVERING;
187                         
188                         CERROR("recovery prepared for rd %p (conn %p)\n",
189                                rd, class_rd2conn(rd));
190                         
191                         spin_unlock(&recovd->recovd_lock);
192                         rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_RECOVER);
193                         spin_lock(&recovd->recovd_lock);
194                         if (rc)
195                                 goto cb_failed;
196                         
197                         rd->rd_next_phase = RECOVD_RECOVERED;
198                         break;
199                         
200                     case RECOVD_RECOVERED:
201                         rd->rd_phase = RECOVD_IDLE;
202                         rd->rd_next_phase = RECOVD_PREPARING;
203                         
204                         CERROR("recovery complete for rd %p (conn %p)\n",
205                                rd, class_rd2conn(rd));
206                         break;
207                         
208                     default:
209                         break;
210                 }
211         }
212         spin_unlock(&recovd->recovd_lock);
213         RETURN(0);
214 }
215
216 static int recovd_main(void *arg)
217 {
218         struct recovd_obd *recovd = (struct recovd_obd *)arg;
219
220         ENTRY;
221
222         lock_kernel();
223         daemonize();
224         spin_lock_irq(&current->sigmask_lock);
225         sigfillset(&current->blocked);
226         recalc_sigpending(current);
227         spin_unlock_irq(&current->sigmask_lock);
228
229         sprintf(current->comm, "lustre_recovd");
230         unlock_kernel();
231
232         /* Signal that the thread is running. */
233         recovd->recovd_thread = current;
234         recovd->recovd_state = RECOVD_READY;
235         wake_up(&recovd->recovd_ctl_waitq);
236
237         /* And now, loop forever on requests. */
238         while (1) {
239                 wait_event(recovd->recovd_waitq, recovd_check_event(recovd));
240                 if (recovd->recovd_state == RECOVD_STOPPING)
241                         break;
242                 recovd_handle_event(recovd);
243         }
244
245         recovd->recovd_thread = NULL;
246         recovd->recovd_state = RECOVD_STOPPED;
247         wake_up(&recovd->recovd_ctl_waitq);
248         CDEBUG(D_NET, "mgr exiting process %d\n", current->pid);
249         RETURN(0);
250 }
251
252 int recovd_setup(struct recovd_obd *recovd)
253 {
254         int rc;
255
256         ENTRY;
257
258         INIT_LIST_HEAD(&recovd->recovd_managed_items);
259         INIT_LIST_HEAD(&recovd->recovd_troubled_items);
260         spin_lock_init(&recovd->recovd_lock);
261
262         init_waitqueue_head(&recovd->recovd_waitq);
263         init_waitqueue_head(&recovd->recovd_recovery_waitq);
264         init_waitqueue_head(&recovd->recovd_ctl_waitq);
265
266         rc = kernel_thread(recovd_main, (void *)recovd,
267                            CLONE_VM | CLONE_FS | CLONE_FILES);
268         if (rc < 0) {
269                 CERROR("cannot start thread\n");
270                 RETURN(-EINVAL);
271         }
272         wait_event(recovd->recovd_ctl_waitq,
273                    recovd->recovd_state == RECOVD_READY);
274
275         ptlrpc_recovd = recovd;
276
277         RETURN(0);
278 }
279
280 int recovd_cleanup(struct recovd_obd *recovd)
281 {
282         spin_lock(&recovd->recovd_lock);
283         recovd->recovd_state = RECOVD_STOPPING;
284         wake_up(&recovd->recovd_waitq);
285         spin_unlock(&recovd->recovd_lock);
286
287         wait_event(recovd->recovd_ctl_waitq,
288                    (recovd->recovd_state == RECOVD_STOPPED));
289         RETURN(0);
290 }
291
292 struct recovd_obd *ptlrpc_recovd;