Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / ptlrpc / recover.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/recover.c
37  *
38  * Author: Mike Shaver <shaver@clusterfs.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #ifdef __KERNEL__
43 # include <libcfs/libcfs.h>
44 #else
45 # include <liblustre.h>
46 #endif
47
48 #include <obd_support.h>
49 #include <lustre_ha.h>
50 #include <lustre_net.h>
51 #include <lustre_import.h>
52 #include <lustre_export.h>
53 #include <obd.h>
54 #include <obd_ost.h>
55 #include <obd_class.h>
56 #include <obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
57 #include <libcfs/list.h>
58
59 #include "ptlrpc_internal.h"
60
61 static int ptlrpc_recover_import_no_retry(struct obd_import *, char *);
62
63 void ptlrpc_initiate_recovery(struct obd_import *imp)
64 {
65         ENTRY;
66
67         CDEBUG(D_HA, "%s: starting recovery\n", obd2cli_tgt(imp->imp_obd));
68         ptlrpc_connect_import(imp, NULL);
69
70         EXIT;
71 }
72
73 int ptlrpc_replay_next(struct obd_import *imp, int *inflight)
74 {
75         int rc = 0;
76         struct list_head *tmp, *pos;
77         struct ptlrpc_request *req = NULL;
78         __u64 last_transno;
79         ENTRY;
80
81         *inflight = 0;
82
83         /* It might have committed some after we last spoke, so make sure we
84          * get rid of them now.
85          */
86         spin_lock(&imp->imp_lock);
87         imp->imp_last_transno_checked = 0;
88         ptlrpc_free_committed(imp);
89         last_transno = imp->imp_last_replay_transno;
90         spin_unlock(&imp->imp_lock);
91
92         CDEBUG(D_HA, "import %p from %s committed "LPU64" last "LPU64"\n",
93                imp, obd2cli_tgt(imp->imp_obd),
94                imp->imp_peer_committed_transno, last_transno);
95
96         /* Do I need to hold a lock across this iteration?  We shouldn't be
97          * racing with any additions to the list, because we're in recovery
98          * and are therefore not processing additional requests to add.  Calls
99          * to ptlrpc_free_committed might commit requests, but nothing "newer"
100          * than the one we're replaying (it can't be committed until it's
101          * replayed, and we're doing that here).  l_f_e_safe protects against
102          * problems with the current request being committed, in the unlikely
103          * event of that race.  So, in conclusion, I think that it's safe to
104          * perform this list-walk without the imp_lock held.
105          *
106          * But, the {mdc,osc}_replay_open callbacks both iterate
107          * request lists, and have comments saying they assume the
108          * imp_lock is being held by ptlrpc_replay, but it's not. it's
109          * just a little race...
110          */
111         list_for_each_safe(tmp, pos, &imp->imp_replay_list) {
112                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
113
114                 /* If need to resend the last sent transno (because a
115                    reconnect has occurred), then stop on the matching
116                    req and send it again. If, however, the last sent
117                    transno has been committed then we continue replay
118                    from the next request. */
119                 if (imp->imp_resend_replay && 
120                     req->rq_transno == last_transno) {
121                         lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
122                         break;
123                 }
124
125                 if (req->rq_transno > last_transno) {
126                         imp->imp_last_replay_transno = req->rq_transno;
127                         break;
128                 }
129
130                 req = NULL;
131         }
132
133         spin_lock(&imp->imp_lock);
134         imp->imp_resend_replay = 0;
135         spin_unlock(&imp->imp_lock);
136
137         if (req != NULL) {
138                 rc = ptlrpc_replay_req(req);
139                 if (rc) {
140                         CERROR("recovery replay error %d for req "
141                                LPD64"\n", rc, req->rq_xid);
142                         RETURN(rc);
143                 }
144                 *inflight = 1;
145         }
146         RETURN(rc);
147 }
148
149 int ptlrpc_resend(struct obd_import *imp)
150 {
151         struct ptlrpc_request *req, *next;
152
153         ENTRY;
154
155         /* As long as we're in recovery, nothing should be added to the sending
156          * list, so we don't need to hold the lock during this iteration and
157          * resend process.
158          */
159         /* Well... what if lctl recover is called twice at the same time?
160          */
161         spin_lock(&imp->imp_lock);
162         if (imp->imp_state != LUSTRE_IMP_RECOVER) {
163                 spin_unlock(&imp->imp_lock);
164                 RETURN(-1);
165         }
166
167         list_for_each_entry_safe(req, next, &imp->imp_sending_list, rq_list) {
168                 LASSERTF((long)req > CFS_PAGE_SIZE && req != LP_POISON,
169                          "req %p bad\n", req);
170                 LASSERTF(req->rq_type != LI_POISON, "req %p freed\n", req);
171                 if (!req->rq_no_resend)
172                         ptlrpc_resend_req(req);
173         }
174         spin_unlock(&imp->imp_lock);
175
176         RETURN(0);
177 }
178
179 void ptlrpc_wake_delayed(struct obd_import *imp)
180 {
181         struct list_head *tmp, *pos;
182         struct ptlrpc_request *req;
183
184         spin_lock(&imp->imp_lock);
185         list_for_each_safe(tmp, pos, &imp->imp_delayed_list) {
186                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
187
188                 DEBUG_REQ(D_HA, req, "waking (set %p):", req->rq_set);
189                 ptlrpc_client_wake_req(req);
190         }
191         spin_unlock(&imp->imp_lock);
192 }
193
194 void ptlrpc_request_handle_notconn(struct ptlrpc_request *failed_req)
195 {
196         struct obd_import *imp = failed_req->rq_import;
197         ENTRY;
198
199         CDEBUG(D_HA, "import %s of %s@%s abruptly disconnected: reconnecting\n",
200                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
201                imp->imp_connection->c_remote_uuid.uuid);
202
203         if (ptlrpc_set_import_discon(imp,
204                               lustre_msg_get_conn_cnt(failed_req->rq_reqmsg))) {
205                 if (!imp->imp_replayable) {
206                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
207                                "auto-deactivating\n",
208                                obd2cli_tgt(imp->imp_obd),
209                                imp->imp_connection->c_remote_uuid.uuid,
210                                imp->imp_obd->obd_name);
211                         ptlrpc_deactivate_import(imp);
212                 }
213                 /* to control recovery via lctl {disable|enable}_recovery */
214                 if (imp->imp_deactive == 0)
215                         ptlrpc_connect_import(imp, NULL);
216         }
217
218         /* Wait for recovery to complete and resend. If evicted, then
219            this request will be errored out later.*/
220         spin_lock(&failed_req->rq_lock);
221         if (!failed_req->rq_no_resend)
222                 failed_req->rq_resend = 1;
223         spin_unlock(&failed_req->rq_lock);
224
225         EXIT;
226 }
227
228 /*
229  * Administratively active/deactive a client. 
230  * This should only be called by the ioctl interface, currently
231  *  - the lctl deactivate and activate commands
232  *  - echo 0/1 >> /proc/osc/XXX/active
233  *  - client umount -f (ll_umount_begin)
234  */
235 int ptlrpc_set_import_active(struct obd_import *imp, int active)
236 {
237         struct obd_device *obd = imp->imp_obd;
238         int rc = 0;
239
240         ENTRY;
241         LASSERT(obd);
242
243         /* When deactivating, mark import invalid, and abort in-flight
244          * requests. */
245         if (!active) {
246                 LCONSOLE_WARN("setting import %s INACTIVE by administrator "
247                               "request\n", obd2cli_tgt(imp->imp_obd));
248
249                 /* set before invalidate to avoid messages about imp_inval
250                  * set without imp_deactive in ptlrpc_import_delay_req */
251                 spin_lock(&imp->imp_lock);
252                 imp->imp_deactive = 1;
253                 spin_unlock(&imp->imp_lock);
254
255                 ptlrpc_invalidate_import(imp);
256         }
257
258         /* When activating, mark import valid, and attempt recovery */
259         if (active) {
260                 CDEBUG(D_HA, "setting import %s VALID\n",
261                        obd2cli_tgt(imp->imp_obd));
262                 rc = ptlrpc_recover_import(imp, NULL);
263         }
264
265         RETURN(rc);
266 }
267
268 /* Attempt to reconnect an import */
269 int ptlrpc_recover_import(struct obd_import *imp, char *new_uuid)
270 {
271         int rc;
272         ENTRY;
273
274         spin_lock(&imp->imp_lock);
275         if (atomic_read(&imp->imp_inval_count)) {
276                 spin_unlock(&imp->imp_lock);
277                 RETURN(-EINVAL);
278         }
279         spin_unlock(&imp->imp_lock);
280
281         /* force import to be disconnected. */
282         ptlrpc_set_import_discon(imp, 0);
283
284         spin_lock(&imp->imp_lock);
285         imp->imp_deactive = 0;
286         spin_unlock(&imp->imp_lock);
287
288         rc = ptlrpc_recover_import_no_retry(imp, new_uuid);
289
290         RETURN(rc);
291 }
292
293 int ptlrpc_import_in_recovery(struct obd_import *imp)
294 {
295         int in_recovery = 1;
296         spin_lock(&imp->imp_lock);
297         if (imp->imp_state == LUSTRE_IMP_FULL ||
298             imp->imp_state == LUSTRE_IMP_CLOSED ||
299             imp->imp_state == LUSTRE_IMP_DISCON)
300                 in_recovery = 0;
301         spin_unlock(&imp->imp_lock);
302         return in_recovery;
303 }
304
305 static int ptlrpc_recover_import_no_retry(struct obd_import *imp,
306                                           char *new_uuid)
307 {
308         int rc;
309         int in_recovery = 0;
310         struct l_wait_info lwi;
311         ENTRY;
312
313         /* Check if reconnect is already in progress */
314         spin_lock(&imp->imp_lock);
315         if (imp->imp_state != LUSTRE_IMP_DISCON) {
316                 in_recovery = 1;
317         }
318         spin_unlock(&imp->imp_lock);
319
320         if (in_recovery == 1)
321                 RETURN(-EALREADY);
322
323         rc = ptlrpc_connect_import(imp, new_uuid);
324         if (rc)
325                 RETURN(rc);
326
327         CDEBUG(D_HA, "%s: recovery started, waiting\n",
328                obd2cli_tgt(imp->imp_obd));
329
330         lwi = LWI_TIMEOUT(cfs_timeout_cap(cfs_time_seconds(obd_timeout)), 
331                           NULL, NULL);
332         rc = l_wait_event(imp->imp_recovery_waitq,
333                           !ptlrpc_import_in_recovery(imp), &lwi);
334         CDEBUG(D_HA, "%s: recovery finished\n",
335                obd2cli_tgt(imp->imp_obd));
336
337         RETURN(rc);
338 }