Whamcloud - gitweb
branch: b_new_cmd
[fs/lustre-release.git] / lustre / ptlrpc / recover.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Portal-RPC reconnection and replay operations, for use in recovery.
5  *
6  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
7  *   Author: Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of the Lustre file system, http://www.lustre.org
10  *   Lustre is a trademark of Cluster File Systems, Inc.
11  *
12  *   You may have signed or agreed to another license before downloading
13  *   this software.  If so, you are bound by the terms and conditions
14  *   of that agreement, and the following does not apply to you.  See the
15  *   LICENSE file included with this distribution for more information.
16  *
17  *   If you did not agree to a different license, then this copy of Lustre
18  *   is open source software; you can redistribute it and/or modify it
19  *   under the terms of version 2 of the GNU General Public License as
20  *   published by the Free Software Foundation.
21  *
22  *   In either case, Lustre is distributed in the hope that it will be
23  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
24  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
25  *   license text for more details.
26  */
27
28 #define DEBUG_SUBSYSTEM S_RPC
29 #ifdef __KERNEL__
30 # include <libcfs/libcfs.h>
31 #else
32 # include <liblustre.h>
33 #endif
34
35 #include <obd_support.h>
36 #include <lustre_ha.h>
37 #include <lustre_net.h>
38 #include <lustre_import.h>
39 #include <lustre_export.h>
40 #include <obd.h>
41 #include <obd_ost.h>
42 #include <obd_class.h>
43 #include <obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
44 #include <libcfs/list.h>
45
46 #include "ptlrpc_internal.h"
47
48 static int ptlrpc_recover_import_no_retry(struct obd_import *, char *);
49
50 void ptlrpc_initiate_recovery(struct obd_import *imp)
51 {
52         ENTRY;
53
54         CDEBUG(D_HA, "%s: starting recovery\n", obd2cli_tgt(imp->imp_obd));
55         ptlrpc_connect_import(imp, NULL);
56
57         EXIT;
58 }
59
60 int ptlrpc_replay_next(struct obd_import *imp, int *inflight)
61 {
62         int rc = 0;
63         struct list_head *tmp, *pos;
64         struct ptlrpc_request *req = NULL;
65         __u64 last_transno;
66         ENTRY;
67
68         *inflight = 0;
69
70         /* It might have committed some after we last spoke, so make sure we
71          * get rid of them now.
72          */
73         spin_lock(&imp->imp_lock);
74         imp->imp_last_transno_checked = 0;
75         ptlrpc_free_committed(imp);
76         last_transno = imp->imp_last_replay_transno;
77         spin_unlock(&imp->imp_lock);
78         CDEBUG(D_HA, "import %p from %s committed "LPU64" last "LPU64"\n",
79                imp, obd2cli_tgt(imp->imp_obd),
80                imp->imp_peer_committed_transno, last_transno);
81
82         /* Do I need to hold a lock across this iteration?  We shouldn't be
83          * racing with any additions to the list, because we're in recovery
84          * and are therefore not processing additional requests to add.  Calls
85          * to ptlrpc_free_committed might commit requests, but nothing "newer"
86          * than the one we're replaying (it can't be committed until it's
87          * replayed, and we're doing that here).  l_f_e_safe protects against
88          * problems with the current request being committed, in the unlikely
89          * event of that race.  So, in conclusion, I think that it's safe to
90          * perform this list-walk without the imp_lock held.
91          *
92          * But, the {mdc,osc}_replay_open callbacks both iterate
93          * request lists, and have comments saying they assume the
94          * imp_lock is being held by ptlrpc_replay, but it's not. it's
95          * just a little race...
96          */
97         list_for_each_safe(tmp, pos, &imp->imp_replay_list) {
98                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
99                 /* If need to resend the last sent transno (because a
100                    reconnect has occurred), then stop on the matching
101                    req and send it again. If, however, the last sent
102                    transno has been committed then we continue replay
103                    from the next request. */
104                 if (imp->imp_resend_replay && 
105                     req->rq_transno == last_transno) {
106                         lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
107                         break;
108                 }
109
110                 if (req->rq_transno > last_transno) {
111                         imp->imp_last_replay_transno = req->rq_transno;
112                         break;
113                 }
114
115                 req = NULL;
116         }
117
118         imp->imp_resend_replay = 0;
119
120         if (req != NULL) {
121                 rc = ptlrpc_replay_req(req);
122                 if (rc) {
123                         CERROR("recovery replay error %d for req "
124                                LPD64"\n", rc, req->rq_xid);
125                         RETURN(rc);
126                 }
127                 *inflight = 1;
128         }
129         RETURN(rc);
130 }
131
132 int ptlrpc_resend(struct obd_import *imp)
133 {
134         struct ptlrpc_request *req, *next;
135
136         ENTRY;
137
138         /* As long as we're in recovery, nothing should be added to the sending
139          * list, so we don't need to hold the lock during this iteration and
140          * resend process.
141          */
142         /* Well... what if lctl recover is called twice at the same time?
143          */
144         spin_lock(&imp->imp_lock);
145         if (imp->imp_state != LUSTRE_IMP_RECOVER) {
146                 spin_unlock(&imp->imp_lock);
147                 RETURN(-1);
148         }
149         spin_unlock(&imp->imp_lock);
150
151         list_for_each_entry_safe(req, next, &imp->imp_sending_list, rq_list) {
152                 LASSERTF((long)req > PAGE_SIZE && req != LP_POISON,
153                          "req %p bad\n", req);
154                 LASSERTF(req->rq_type != LI_POISON, "req %p freed\n", req);
155                 ptlrpc_resend_req(req);
156         }
157
158         RETURN(0);
159 }
160
161 void ptlrpc_wake_delayed(struct obd_import *imp)
162 {
163         struct list_head *tmp, *pos;
164         struct ptlrpc_request *req;
165
166         spin_lock(&imp->imp_lock);
167         list_for_each_safe(tmp, pos, &imp->imp_delayed_list) {
168                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
169
170                 DEBUG_REQ(D_HA, req, "waking (set %p):", req->rq_set);
171                 ptlrpc_wake_client_req(req);
172         }
173         spin_unlock(&imp->imp_lock);
174 }
175
176 void ptlrpc_request_handle_notconn(struct ptlrpc_request *failed_req)
177 {
178         struct obd_import *imp = failed_req->rq_import;
179         ENTRY;
180
181         CDEBUG(D_HA, "import %s of %s@%s abruptly disconnected: reconnecting\n",
182                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
183                imp->imp_connection->c_remote_uuid.uuid);
184
185         if (ptlrpc_set_import_discon(imp,
186                               lustre_msg_get_conn_cnt(failed_req->rq_reqmsg))) {
187                 if (!imp->imp_replayable) {
188                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
189                                "auto-deactivating\n",
190                                obd2cli_tgt(imp->imp_obd),
191                                imp->imp_connection->c_remote_uuid.uuid,
192                                imp->imp_obd->obd_name);
193                         ptlrpc_deactivate_import(imp);
194                 }
195                 /* to control recovery via lctl {disable|enable}_recovery */
196                 if (imp->imp_deactive == 0)
197                         ptlrpc_connect_import(imp, NULL);
198         }
199
200         /* Wait for recovery to complete and resend. If evicted, then
201            this request will be errored out later.*/
202         spin_lock(&failed_req->rq_lock);
203         if (!failed_req->rq_no_resend)
204                 failed_req->rq_resend = 1;
205         spin_unlock(&failed_req->rq_lock);
206
207         EXIT;
208 }
209
210 /*
211  * Administratively active/deactive a client. 
212  * This should only be called by the ioctl interface, currently
213  * with the lctl deactivate and activate commands, and
214  * client umount -f (ll_umount_begin)
215  */
216 int ptlrpc_set_import_active(struct obd_import *imp, int active)
217 {
218         struct obd_device *obd = imp->imp_obd;
219         int rc = 0;
220
221         ENTRY;
222         LASSERT(obd);
223
224         /* When deactivating, mark import invalid, and abort in-flight
225          * requests. */
226         if (!active) {
227                 CWARN("setting import %s INACTIVE by administrator request\n",
228                       obd2cli_tgt(imp->imp_obd));
229                 ptlrpc_invalidate_import(imp);
230                 imp->imp_deactive = 1;
231         }
232
233         /* When activating, mark import valid, and attempt recovery */
234         if (active) {
235                 imp->imp_deactive = 0;
236                 CDEBUG(D_HA, "setting import %s VALID\n",
237                        obd2cli_tgt(imp->imp_obd));
238                 rc = ptlrpc_recover_import(imp, NULL);
239         }
240
241         RETURN(rc);
242 }
243
244 /* Attempt to reconnect an import */
245 int ptlrpc_recover_import(struct obd_import *imp, char *new_uuid)
246 {
247         int rc;
248         ENTRY;
249
250         /* force import to be disconnected. */
251         ptlrpc_set_import_discon(imp, 0);
252
253         imp->imp_deactive = 0;
254         rc = ptlrpc_recover_import_no_retry(imp, new_uuid);
255
256         RETURN(rc);
257 }
258
259 int ptlrpc_import_in_recovery(struct obd_import *imp)
260 {
261         int in_recovery = 1;
262         spin_lock(&imp->imp_lock);
263         if (imp->imp_state == LUSTRE_IMP_FULL ||
264             imp->imp_state == LUSTRE_IMP_CLOSED ||
265             imp->imp_state == LUSTRE_IMP_DISCON)
266                 in_recovery = 0;
267         spin_unlock(&imp->imp_lock);
268         return in_recovery;
269 }
270
271 static int ptlrpc_recover_import_no_retry(struct obd_import *imp,
272                                           char *new_uuid)
273 {
274         int rc;
275         int in_recovery = 0;
276         struct l_wait_info lwi;
277         ENTRY;
278
279         /* Check if reconnect is already in progress */
280         spin_lock(&imp->imp_lock);
281         if (imp->imp_state != LUSTRE_IMP_DISCON) {
282                 in_recovery = 1;
283         }
284         spin_unlock(&imp->imp_lock);
285
286         if (in_recovery == 1)
287                 RETURN(-EALREADY);
288
289         rc = ptlrpc_connect_import(imp, new_uuid);
290         if (rc)
291                 RETURN(rc);
292
293         CDEBUG(D_HA, "%s: recovery started, waiting\n",
294                obd2cli_tgt(imp->imp_obd));
295
296         lwi = LWI_TIMEOUT(cfs_timeout_cap(cfs_time_seconds(obd_timeout)), 
297                           NULL, NULL);
298         rc = l_wait_event(imp->imp_recovery_waitq,
299                           !ptlrpc_import_in_recovery(imp), &lwi);
300         CDEBUG(D_HA, "%s: recovery finished\n",
301                obd2cli_tgt(imp->imp_obd));
302
303         RETURN(rc);
304 }