Whamcloud - gitweb
1b264cea66bcb40e497068172f7388130125bb2e
[fs/lustre-release.git] / lustre / ptlrpc / recover.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Portal-RPC reconnection and replay operations, for use in recovery.
5  *
6  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
7  *   Author: Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_RPC
26 #ifdef __KERNEL__
27 # include <linux/config.h>
28 # include <linux/module.h>
29 # include <linux/kmod.h>
30 #else
31 # include <liblustre.h>
32 #endif
33
34 #include <linux/obd_support.h>
35 #include <linux/lustre_ha.h>
36 #include <linux/lustre_net.h>
37 #include <linux/lustre_import.h>
38 #include <linux/lustre_export.h>
39 #include <linux/obd.h>
40 #include <linux/obd_ost.h>
41 #include <linux/obd_class.h>
42 #include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
43 #include <libcfs/list.h>
44
45 #include "ptlrpc_internal.h"
46
47 static int ptlrpc_recover_import_no_retry(struct obd_import *, char *);
48
49 void ptlrpc_run_recovery_over_upcall(struct obd_device *obd)
50 {
51         char *argv[4];
52         char *envp[3];
53         int rc;
54         ENTRY;
55
56         argv[0] = obd_lustre_upcall;
57         argv[1] = "RECOVERY_OVER";
58         argv[2] = obd->obd_uuid.uuid;
59         argv[3] = NULL;
60         
61         envp[0] = "HOME=/";
62         envp[1] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
63         envp[2] = NULL;
64
65         rc = USERMODEHELPER(argv[0], argv, envp);
66         if (rc < 0) {
67                 CERROR("Error invoking recovery upcall %s %s %s: %d; check "
68                        "/proc/sys/lustre/upcall\n",
69                        argv[0], argv[1], argv[2], rc);
70
71         } else {
72                 CWARN("Invoked upcall %s %s %s\n",
73                        argv[0], argv[1], argv[2]);
74         }
75 }
76
77 void ptlrpc_run_failed_import_upcall(struct obd_import* imp)
78 {
79 #ifdef __KERNEL__
80         unsigned long flags;
81         char *argv[7];
82         char *envp[3];
83         int rc;
84         ENTRY;
85
86         spin_lock_irqsave(&imp->imp_lock, flags);
87         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
88                 spin_unlock_irqrestore(&imp->imp_lock, flags);
89                 EXIT;
90                 return;
91         }
92         spin_unlock_irqrestore(&imp->imp_lock, flags);
93         
94         argv[0] = obd_lustre_upcall;
95         argv[1] = "FAILED_IMPORT";
96         argv[2] = imp->imp_target_uuid.uuid;
97         argv[3] = imp->imp_obd->obd_name;
98         argv[4] = imp->imp_connection->c_remote_uuid.uuid;
99         argv[5] = imp->imp_obd->obd_uuid.uuid;
100         argv[6] = NULL;
101
102         envp[0] = "HOME=/";
103         envp[1] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
104         envp[2] = NULL;
105
106         rc = USERMODEHELPER(argv[0], argv, envp);
107         if (rc < 0) {
108                 CERROR("Error invoking recovery upcall %s %s %s %s %s %s: %d; "
109                        "check /proc/sys/lustre/lustre_upcall\n",
110                        argv[0], argv[1], argv[2], argv[3], argv[4], argv[5], rc);
111
112         } else {
113                 CWARN("Invoked upcall %s %s %s %s %s %s\n",
114                       argv[0], argv[1], argv[2], argv[3], argv[4], argv[5]);
115         }
116 #else
117         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
118                 EXIT;
119                 return;
120         }
121         ptlrpc_recover_import(imp, NULL);
122 #endif
123 }
124
125 /* This might block waiting for the upcall to start, so it should
126  * not be called from a thread that shouldn't block. (Like ptlrpcd) */
127 void ptlrpc_initiate_recovery(struct obd_import *imp)
128 {
129         ENTRY;
130
131         LASSERT (obd_lustre_upcall != NULL);
132         
133         if (strcmp(obd_lustre_upcall, "DEFAULT") == 0) {
134                 CDEBUG(D_HA, "%s: starting recovery without upcall\n",
135                         imp->imp_target_uuid.uuid);
136                 ptlrpc_connect_import(imp, NULL);
137         } 
138         else if (strcmp(obd_lustre_upcall, "NONE") == 0) {
139                 CDEBUG(D_HA, "%s: recovery disabled\n",
140                         imp->imp_target_uuid.uuid);
141         } 
142         else {
143                 CDEBUG(D_HA, "%s: calling upcall to start recovery\n",
144                         imp->imp_target_uuid.uuid);
145                 ptlrpc_run_failed_import_upcall(imp);
146         }
147
148         EXIT;
149 }
150
151 int ptlrpc_replay_next(struct obd_import *imp, int *inflight)
152 {
153         int rc = 0;
154         struct list_head *tmp, *pos;
155         struct ptlrpc_request *req = NULL;
156         unsigned long flags;
157         __u64 last_transno;
158         ENTRY;
159
160         *inflight = 0;
161
162         /* It might have committed some after we last spoke, so make sure we
163          * get rid of them now.
164          */
165         spin_lock_irqsave(&imp->imp_lock, flags);
166         ptlrpc_free_committed(imp);
167         last_transno = imp->imp_last_replay_transno;
168         spin_unlock_irqrestore(&imp->imp_lock, flags);
169
170         CDEBUG(D_HA, "import %p from %s committed "LPU64" last "LPU64"\n",
171                imp, imp->imp_target_uuid.uuid, imp->imp_peer_committed_transno,
172                last_transno);
173
174         /* Do I need to hold a lock across this iteration?  We shouldn't be
175          * racing with any additions to the list, because we're in recovery
176          * and are therefore not processing additional requests to add.  Calls
177          * to ptlrpc_free_committed might commit requests, but nothing "newer"
178          * than the one we're replaying (it can't be committed until it's
179          * replayed, and we're doing that here).  l_f_e_safe protects against
180          * problems with the current request being committed, in the unlikely
181          * event of that race.  So, in conclusion, I think that it's safe to
182          * perform this list-walk without the imp_lock held.
183          *
184          * But, the {mdc,osc}_replay_open callbacks both iterate
185          * request lists, and have comments saying they assume the
186          * imp_lock is being held by ptlrpc_replay, but it's not. it's
187          * just a little race...
188          */
189         list_for_each_safe(tmp, pos, &imp->imp_replay_list) {
190                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
191
192                 /* If need to resend the last sent transno (because a
193                    reconnect has occurred), then stop on the matching
194                    req and send it again. If, however, the last sent
195                    transno has been committed then we continue replay
196                    from the next request. */
197                 if (imp->imp_resend_replay && 
198                     req->rq_transno == last_transno) {
199                         lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
200                         break;
201                 }
202
203                 if (req->rq_transno > last_transno) {
204                         imp->imp_last_replay_transno = req->rq_transno;
205                         break;
206                 }
207
208                 req = NULL;
209         }
210
211         imp->imp_resend_replay = 0;
212
213         if (req != NULL) {
214                 rc = ptlrpc_replay_req(req);
215                 if (rc) {
216                         CERROR("recovery replay error %d for req "
217                                LPD64"\n", rc, req->rq_xid);
218                         RETURN(rc);
219                 }
220                 *inflight = 1;
221         }
222         RETURN(rc);
223 }
224
225 int ptlrpc_resend(struct obd_import *imp)
226 {
227         struct ptlrpc_request *req, *next;
228         unsigned long flags;
229
230         ENTRY;
231
232         /* As long as we're in recovery, nothing should be added to the sending
233          * list, so we don't need to hold the lock during this iteration and
234          * resend process.
235          */
236         /* Well... what if lctl recover is called twice at the same time?
237          */
238         spin_lock_irqsave(&imp->imp_lock, flags);
239         if (imp->imp_state != LUSTRE_IMP_RECOVER) {
240                 spin_unlock_irqrestore(&imp->imp_lock, flags);
241                 RETURN(-1);
242         }
243         spin_unlock_irqrestore(&imp->imp_lock, flags);
244
245         list_for_each_entry_safe(req, next, &imp->imp_sending_list, rq_list) {
246                 LASSERTF((long)req > PAGE_SIZE && req != LP_POISON,
247                          "req %p bad\n", req);
248                 LASSERTF(req->rq_type != LI_POISON, "req %p freed\n", req);
249                 ptlrpc_resend_req(req);
250         }
251
252         RETURN(0);
253 }
254
255 void ptlrpc_wake_delayed(struct obd_import *imp)
256 {
257         unsigned long flags;
258         struct list_head *tmp, *pos;
259         struct ptlrpc_request *req;
260
261         spin_lock_irqsave(&imp->imp_lock, flags);
262         list_for_each_safe(tmp, pos, &imp->imp_delayed_list) {
263                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
264
265                 DEBUG_REQ(D_HA, req, "waking (set %p):", req->rq_set);
266                 ptlrpc_wake_client_req(req);
267         }
268         spin_unlock_irqrestore(&imp->imp_lock, flags);
269 }
270
271 void ptlrpc_request_handle_notconn(struct ptlrpc_request *failed_req)
272 {
273         int rc;
274         struct obd_import *imp= failed_req->rq_import;
275         unsigned long flags;
276         ENTRY;
277
278         CDEBUG(D_HA, "import %s of %s@%s abruptly disconnected: reconnecting\n",
279                imp->imp_obd->obd_name,
280                imp->imp_target_uuid.uuid,
281                imp->imp_connection->c_remote_uuid.uuid);
282
283         if (ptlrpc_set_import_discon(imp)) {
284                 if (!imp->imp_replayable) {
285                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
286                                "auto-deactivating\n",
287                                imp->imp_target_uuid.uuid,
288                                imp->imp_connection->c_remote_uuid.uuid,
289                                imp->imp_obd->obd_name);
290                         ptlrpc_deactivate_import(imp);
291                 }
292
293                 rc = ptlrpc_connect_import(imp, NULL);
294         }
295
296         /* Wait for recovery to complete and resend. If evicted, then
297            this request will be errored out later.*/
298         spin_lock_irqsave(&failed_req->rq_lock, flags);
299         if (!failed_req->rq_no_resend)
300                 failed_req->rq_resend = 1;
301         spin_unlock_irqrestore(&failed_req->rq_lock, flags);
302
303         EXIT;
304 }
305
306 /*
307  * This should only be called by the ioctl interface, currently
308  * with the lctl deactivate and activate commands.
309  */
310 int ptlrpc_set_import_active(struct obd_import *imp, int active)
311 {
312         struct obd_device *obd = imp->imp_obd;
313         int rc = 0;
314
315         LASSERT(obd);
316
317         /* When deactivating, mark import invalid, and abort in-flight
318          * requests. */
319         if (!active) {
320                 ptlrpc_invalidate_import(imp);
321                 imp->imp_deactive = 1;
322         } 
323
324         /* When activating, mark import valid, and attempt recovery */
325         if (active) {
326                 imp->imp_deactive = 0;
327                 CDEBUG(D_HA, "setting import %s VALID\n",
328                        imp->imp_target_uuid.uuid);
329                 rc = ptlrpc_recover_import(imp, NULL);
330         }
331
332         RETURN(rc);
333 }
334
335 int ptlrpc_recover_import(struct obd_import *imp, char *new_uuid)
336 {
337         int rc;
338         ENTRY;
339
340         /* force import to be disconnected. */
341         ptlrpc_set_import_discon(imp);
342
343         rc = ptlrpc_recover_import_no_retry(imp, new_uuid);
344
345         RETURN(rc);
346 }
347
348 int ptlrpc_import_in_recovery(struct obd_import *imp)
349 {
350         unsigned long flags;
351         int in_recovery = 1;
352         spin_lock_irqsave(&imp->imp_lock, flags);
353         if (imp->imp_state == LUSTRE_IMP_FULL ||
354             imp->imp_state == LUSTRE_IMP_CLOSED ||
355             imp->imp_state == LUSTRE_IMP_DISCON)
356                 in_recovery = 0;
357         spin_unlock_irqrestore(&imp->imp_lock, flags);
358         return in_recovery;
359 }
360
361 static int ptlrpc_recover_import_no_retry(struct obd_import *imp,
362                                           char *new_uuid)
363 {
364         int rc;
365         unsigned long flags;
366         int in_recovery = 0;
367         struct l_wait_info lwi;
368         ENTRY;
369
370         spin_lock_irqsave(&imp->imp_lock, flags);
371         if (imp->imp_state != LUSTRE_IMP_DISCON) {
372                 in_recovery = 1;
373         }
374         spin_unlock_irqrestore(&imp->imp_lock, flags);
375
376         if (in_recovery == 1)
377                 RETURN(-EALREADY);
378
379         rc = ptlrpc_connect_import(imp, new_uuid);
380         if (rc)
381                 RETURN(rc);
382
383         CDEBUG(D_HA, "%s: recovery started, waiting\n",
384                imp->imp_target_uuid.uuid);
385
386         lwi = LWI_TIMEOUT(MAX(obd_timeout * HZ, 1), NULL, NULL);
387         rc = l_wait_event(imp->imp_recovery_waitq,
388                           !ptlrpc_import_in_recovery(imp), &lwi);
389         CDEBUG(D_HA, "%s: recovery finished\n",
390                imp->imp_target_uuid.uuid);
391
392         RETURN(rc);
393 }
394
395 void ptlrpc_fail_export(struct obd_export *exp)
396 {
397         int rc, already_failed;
398         unsigned long flags;
399
400         spin_lock_irqsave(&exp->exp_lock, flags);
401         already_failed = exp->exp_failed;
402         exp->exp_failed = 1;
403         spin_unlock_irqrestore(&exp->exp_lock, flags);
404
405         if (already_failed) {
406                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
407                        exp, exp->exp_client_uuid.uuid);
408                 return;
409         }
410
411         CDEBUG(D_HA, "disconnecting export %p/%s\n",
412                exp, exp->exp_client_uuid.uuid);
413
414         if (obd_dump_on_timeout)
415                 portals_debug_dumplog();
416
417         /* Most callers into obd_disconnect are removing their own reference
418          * (request, for example) in addition to the one from the hash table.
419          * We don't have such a reference here, so make one. */
420         class_export_get(exp);
421         rc = obd_disconnect(exp);
422         if (rc)
423                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
424 }