Whamcloud - gitweb
2075c47a1fa4c5840c8e0666ba9cd0621d0a4707
[fs/lustre-release.git] / lustre / ptlrpc / recover.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Portal-RPC reconnection and replay operations, for use in recovery.
5  *
6  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
7  *   Author: Mike Shaver <shaver@clusterfs.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_RPC
26 #ifdef __KERNEL__
27 # include <linux/config.h>
28 # include <linux/module.h>
29 # include <linux/kmod.h>
30 # include <linux/list.h>
31 #else
32 # include <liblustre.h>
33 #endif
34
35 #include <linux/obd_support.h>
36 #include <linux/lustre_ha.h>
37 #include <linux/lustre_net.h>
38 #include <linux/lustre_import.h>
39 #include <linux/lustre_export.h>
40 #include <linux/obd.h>
41 #include <linux/obd_ost.h>
42 #include <linux/obd_class.h>
43 #include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
44 #include <libcfs/list.h>
45
46 #include "ptlrpc_internal.h"
47
48 static int ptlrpc_recover_import_no_retry(struct obd_import *, char *);
49
50 void ptlrpc_run_recovery_over_upcall(struct obd_device *obd)
51 {
52         char *argv[4];
53         char *envp[3];
54         int rc;
55         ENTRY;
56
57         argv[0] = obd_lustre_upcall;
58         argv[1] = "RECOVERY_OVER";
59         argv[2] = obd->obd_uuid.uuid;
60         argv[3] = NULL;
61         
62         envp[0] = "HOME=/";
63         envp[1] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
64         envp[2] = NULL;
65
66         rc = USERMODEHELPER(argv[0], argv, envp);
67         if (rc < 0) {
68                 CERROR("Error invoking recovery upcall %s %s %s: %d; check "
69                        "/proc/sys/lustre/upcall\n",
70                        argv[0], argv[1], argv[2], rc);
71
72         } else {
73                 CWARN("Invoked upcall %s %s %s\n",
74                        argv[0], argv[1], argv[2]);
75         }
76 }
77
78 void ptlrpc_run_failed_import_upcall(struct obd_import* imp)
79 {
80 #ifdef __KERNEL__
81         unsigned long flags;
82         char *argv[7];
83         char *envp[3];
84         int rc;
85         ENTRY;
86
87         spin_lock_irqsave(&imp->imp_lock, flags);
88         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
89                 spin_unlock_irqrestore(&imp->imp_lock, flags);
90                 EXIT;
91                 return;
92         }
93         spin_unlock_irqrestore(&imp->imp_lock, flags);
94         
95         argv[0] = obd_lustre_upcall;
96         argv[1] = "FAILED_IMPORT";
97         argv[2] = imp->imp_target_uuid.uuid;
98         argv[3] = imp->imp_obd->obd_name;
99         argv[4] = imp->imp_connection->c_remote_uuid.uuid;
100         argv[5] = imp->imp_obd->obd_uuid.uuid;
101         argv[6] = NULL;
102
103         envp[0] = "HOME=/";
104         envp[1] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
105         envp[2] = NULL;
106
107         rc = USERMODEHELPER(argv[0], argv, envp);
108         if (rc < 0) {
109                 CERROR("Error invoking recovery upcall %s %s %s %s %s %s: %d; "
110                        "check /proc/sys/lustre/lustre_upcall\n",
111                        argv[0], argv[1], argv[2], argv[3], argv[4], argv[5], rc);
112
113         } else {
114                 CWARN("Invoked upcall %s %s %s %s %s %s\n",
115                       argv[0], argv[1], argv[2], argv[3], argv[4], argv[5]);
116         }
117 #else
118         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
119                 EXIT;
120                 return;
121         }
122         ptlrpc_recover_import(imp, NULL);
123 #endif
124 }
125
126 /* This might block waiting for the upcall to start, so it should
127  * not be called from a thread that shouldn't block. (Like ptlrpcd) */
128 void ptlrpc_initiate_recovery(struct obd_import *imp)
129 {
130         ENTRY;
131
132         LASSERT (obd_lustre_upcall != NULL);
133         
134         if (strcmp(obd_lustre_upcall, "DEFAULT") == 0) {
135                 CDEBUG(D_HA, "%s: starting recovery without upcall\n",
136                         imp->imp_target_uuid.uuid);
137                 ptlrpc_connect_import(imp, NULL);
138         } 
139         else if (strcmp(obd_lustre_upcall, "NONE") == 0) {
140                 CDEBUG(D_HA, "%s: recovery disabled\n",
141                         imp->imp_target_uuid.uuid);
142         } 
143         else {
144                 CDEBUG(D_HA, "%s: calling upcall to start recovery\n",
145                         imp->imp_target_uuid.uuid);
146                 ptlrpc_run_failed_import_upcall(imp);
147         }
148
149         EXIT;
150 }
151
152 int ptlrpc_replay_next(struct obd_import *imp, int *inflight)
153 {
154         int rc = 0;
155         struct list_head *tmp, *pos;
156         struct ptlrpc_request *req = NULL;
157         unsigned long flags;
158         __u64 last_transno;
159         ENTRY;
160
161         *inflight = 0;
162
163         /* It might have committed some after we last spoke, so make sure we
164          * get rid of them now.
165          */
166         spin_lock_irqsave(&imp->imp_lock, flags);
167         ptlrpc_free_committed(imp);
168         last_transno = imp->imp_last_replay_transno;
169         spin_unlock_irqrestore(&imp->imp_lock, flags);
170
171         CDEBUG(D_HA, "import %p from %s committed "LPU64" last "LPU64"\n",
172                imp, imp->imp_target_uuid.uuid, imp->imp_peer_committed_transno,
173                last_transno);
174
175         /* Do I need to hold a lock across this iteration?  We shouldn't be
176          * racing with any additions to the list, because we're in recovery
177          * and are therefore not processing additional requests to add.  Calls
178          * to ptlrpc_free_committed might commit requests, but nothing "newer"
179          * than the one we're replaying (it can't be committed until it's
180          * replayed, and we're doing that here).  l_f_e_safe protects against
181          * problems with the current request being committed, in the unlikely
182          * event of that race.  So, in conclusion, I think that it's safe to
183          * perform this list-walk without the imp_lock held.
184          *
185          * But, the {mdc,osc}_replay_open callbacks both iterate
186          * request lists, and have comments saying they assume the
187          * imp_lock is being held by ptlrpc_replay, but it's not. it's
188          * just a little race...
189          */
190         list_for_each_safe(tmp, pos, &imp->imp_replay_list) {
191                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
192
193                 /* If need to resend the last sent transno (because a
194                    reconnect has occurred), then stop on the matching
195                    req and send it again. If, however, the last sent
196                    transno has been committed then we continue replay
197                    from the next request. */
198                 if (imp->imp_resend_replay && 
199                     req->rq_transno == last_transno) {
200                         lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
201                         break;
202                 }
203
204                 if (req->rq_transno > last_transno) {
205                         imp->imp_last_replay_transno = req->rq_transno;
206                         break;
207                 }
208
209                 req = NULL;
210         }
211
212         imp->imp_resend_replay = 0;
213
214         if (req != NULL) {
215                 rc = ptlrpc_replay_req(req);
216                 if (rc) {
217                         CERROR("recovery replay error %d for req "
218                                LPD64"\n", rc, req->rq_xid);
219                         RETURN(rc);
220                 }
221                 *inflight = 1;
222         }
223         RETURN(rc);
224 }
225
226 int ptlrpc_resend(struct obd_import *imp)
227 {
228         struct ptlrpc_request *req, *next;
229         unsigned long flags;
230
231         ENTRY;
232
233         /* As long as we're in recovery, nothing should be added to the sending
234          * list, so we don't need to hold the lock during this iteration and
235          * resend process.
236          */
237         /* Well... what if lctl recover is called twice at the same time?
238          */
239         spin_lock_irqsave(&imp->imp_lock, flags);
240         if (imp->imp_state != LUSTRE_IMP_RECOVER) {
241                 spin_unlock_irqrestore(&imp->imp_lock, flags);
242                 RETURN(-1);
243         }
244         spin_unlock_irqrestore(&imp->imp_lock, flags);
245
246         list_for_each_entry_safe(req, next, &imp->imp_sending_list, rq_list) {
247                 LASSERTF((long)req > PAGE_SIZE && req != LP_POISON,
248                          "req %p bad\n", req);
249                 LASSERTF(req->rq_type != LI_POISON, "req %p freed\n", req);
250                 ptlrpc_resend_req(req);
251         }
252
253         RETURN(0);
254 }
255
256 void ptlrpc_wake_delayed(struct obd_import *imp)
257 {
258         unsigned long flags;
259         struct list_head *tmp, *pos;
260         struct ptlrpc_request *req;
261
262         spin_lock_irqsave(&imp->imp_lock, flags);
263         list_for_each_safe(tmp, pos, &imp->imp_delayed_list) {
264                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
265
266                 DEBUG_REQ(D_HA, req, "waking (set %p):", req->rq_set);
267                 ptlrpc_wake_client_req(req);
268         }
269         spin_unlock_irqrestore(&imp->imp_lock, flags);
270 }
271
272 void ptlrpc_request_handle_notconn(struct ptlrpc_request *failed_req)
273 {
274         int rc;
275         struct obd_import *imp= failed_req->rq_import;
276         unsigned long flags;
277         ENTRY;
278
279         CDEBUG(D_HA, "import %s of %s@%s abruptly disconnected: reconnecting\n",
280                imp->imp_obd->obd_name,
281                imp->imp_target_uuid.uuid,
282                imp->imp_connection->c_remote_uuid.uuid);
283
284         if (ptlrpc_set_import_discon(imp)) {
285                 if (!imp->imp_replayable) {
286                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
287                                "auto-deactivating\n",
288                                imp->imp_target_uuid.uuid,
289                                imp->imp_connection->c_remote_uuid.uuid,
290                                imp->imp_obd->obd_name);
291                         ptlrpc_deactivate_import(imp);
292                 }
293
294                 rc = ptlrpc_connect_import(imp, NULL);
295         }
296
297         /* Wait for recovery to complete and resend. If evicted, then
298            this request will be errored out later.*/
299         spin_lock_irqsave(&failed_req->rq_lock, flags);
300         if (!failed_req->rq_no_resend)
301                 failed_req->rq_resend = 1;
302         spin_unlock_irqrestore(&failed_req->rq_lock, flags);
303
304         EXIT;
305 }
306
307 /*
308  * This should only be called by the ioctl interface, currently
309  * with the lctl deactivate and activate commands.
310  */
311 int ptlrpc_set_import_active(struct obd_import *imp, int active)
312 {
313         struct obd_device *obd = imp->imp_obd;
314         int rc = 0;
315
316         LASSERT(obd);
317
318         /* When deactivating, mark import invalid, and abort in-flight
319          * requests. */
320         if (!active) {
321                 CWARN("setting import %s INACTIVE by administrator request\n",
322                       imp->imp_target_uuid.uuid);
323                 ptlrpc_invalidate_import(imp);
324                 imp->imp_deactive = 1;
325         }
326
327         /* When activating, mark import valid, and attempt recovery */
328         if (active) {
329                 imp->imp_deactive = 0;
330                 CDEBUG(D_HA, "setting import %s VALID\n",
331                        imp->imp_target_uuid.uuid);
332                 rc = ptlrpc_recover_import(imp, NULL);
333         }
334
335         RETURN(rc);
336 }
337
338 int ptlrpc_recover_import(struct obd_import *imp, char *new_uuid)
339 {
340         int rc;
341         ENTRY;
342
343         /* force import to be disconnected. */
344         ptlrpc_set_import_discon(imp);
345
346         imp->imp_deactive = 0;
347         rc = ptlrpc_recover_import_no_retry(imp, new_uuid);
348
349         RETURN(rc);
350 }
351
352 int ptlrpc_import_in_recovery(struct obd_import *imp)
353 {
354         unsigned long flags;
355         int in_recovery = 1;
356         spin_lock_irqsave(&imp->imp_lock, flags);
357         if (imp->imp_state == LUSTRE_IMP_FULL ||
358             imp->imp_state == LUSTRE_IMP_CLOSED ||
359             imp->imp_state == LUSTRE_IMP_DISCON)
360                 in_recovery = 0;
361         spin_unlock_irqrestore(&imp->imp_lock, flags);
362         return in_recovery;
363 }
364
365 static int ptlrpc_recover_import_no_retry(struct obd_import *imp,
366                                           char *new_uuid)
367 {
368         int rc;
369         unsigned long flags;
370         int in_recovery = 0;
371         struct l_wait_info lwi;
372         ENTRY;
373
374         spin_lock_irqsave(&imp->imp_lock, flags);
375         if (imp->imp_state != LUSTRE_IMP_DISCON) {
376                 in_recovery = 1;
377         }
378         spin_unlock_irqrestore(&imp->imp_lock, flags);
379
380         if (in_recovery == 1)
381                 RETURN(-EALREADY);
382
383         rc = ptlrpc_connect_import(imp, new_uuid);
384         if (rc)
385                 RETURN(rc);
386
387         CDEBUG(D_HA, "%s: recovery started, waiting\n",
388                imp->imp_target_uuid.uuid);
389
390         lwi = LWI_TIMEOUT(MAX(obd_timeout * HZ, 1), NULL, NULL);
391         rc = l_wait_event(imp->imp_recovery_waitq,
392                           !ptlrpc_import_in_recovery(imp), &lwi);
393         CDEBUG(D_HA, "%s: recovery finished\n",
394                imp->imp_target_uuid.uuid);
395
396         RETURN(rc);
397 }
398
399 void ptlrpc_fail_export(struct obd_export *exp)
400 {
401         int rc, already_failed;
402         unsigned long flags;
403
404         spin_lock_irqsave(&exp->exp_lock, flags);
405         already_failed = exp->exp_failed;
406         exp->exp_failed = 1;
407         spin_unlock_irqrestore(&exp->exp_lock, flags);
408
409         if (already_failed) {
410                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
411                        exp, exp->exp_client_uuid.uuid);
412                 return;
413         }
414
415         CDEBUG(D_HA, "disconnecting export %p/%s\n",
416                exp, exp->exp_client_uuid.uuid);
417
418         if (obd_dump_on_timeout)
419                 portals_debug_dumplog();
420
421         /* Most callers into obd_disconnect are removing their own reference
422          * (request, for example) in addition to the one from the hash table.
423          * We don't have such a reference here, so make one. */
424         class_export_get(exp);
425         rc = obd_disconnect(exp);
426         if (rc)
427                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
428 }