Whamcloud - gitweb
Branch b1_4_mountconf
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Mike Shaver <shaver@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  */
25
26 #define DEBUG_SUBSYSTEM S_RPC
27 #ifdef __KERNEL__
28 # include <linux/config.h>
29 # include <linux/module.h>
30 # include <linux/kmod.h>
31 #else
32 # include <liblustre.h>
33 #endif
34
35 #include <linux/obd_support.h>
36 #include <linux/lustre_ha.h>
37 #include <linux/lustre_net.h>
38 #include <linux/lustre_import.h>
39 #include <linux/lustre_export.h>
40 #include <linux/obd.h>
41 #include <linux/obd_class.h>
42 #include <linux/lustre_ver.h>
43
44 #include "ptlrpc_internal.h"
45
46 struct ptlrpc_connect_async_args {
47          __u64 pcaa_peer_committed;
48         int pcaa_initial_connect;
49 };
50
51 /* A CLOSED import should remain so. */
52 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
53 do {                                                                           \
54         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
55                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
56                       imp, imp->imp_target_uuid.uuid,                          \
57                       ptlrpc_import_state_name(imp->imp_state),                \
58                       ptlrpc_import_state_name(state));                        \
59                imp->imp_state = state;                                         \
60         }                                                                      \
61 } while(0)
62
63 #define IMPORT_SET_STATE(imp, state)                    \
64 do {                                                    \
65         unsigned long flags;                            \
66                                                         \
67         spin_lock_irqsave(&imp->imp_lock, flags);       \
68         IMPORT_SET_STATE_NOLOCK(imp, state);            \
69         spin_unlock_irqrestore(&imp->imp_lock, flags);  \
70 } while(0)
71
72
73 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
74                                     void * data, int rc);
75 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
76
77 /* Only this function is allowed to change the import state when it is
78  * CLOSED. I would rather refcount the import and free it after
79  * disconnection like we do with exports. To do that, the client_obd
80  * will need to save the peer info somewhere other than in the import,
81  * though. */
82 int ptlrpc_init_import(struct obd_import *imp)
83 {
84         unsigned long flags;
85
86         spin_lock_irqsave(&imp->imp_lock, flags);
87
88         imp->imp_generation++;
89         imp->imp_state =  LUSTRE_IMP_NEW;
90
91         spin_unlock_irqrestore(&imp->imp_lock, flags);
92
93         return 0;
94 }
95 EXPORT_SYMBOL(ptlrpc_init_import);
96
97 #define UUID_STR "_UUID"
98 static void deuuidify(char *uuid, const char *prefix, char **uuid_start, int *uuid_len)
99 {
100         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
101                 ? uuid : uuid + strlen(prefix);
102
103         *uuid_len = strlen(*uuid_start);
104
105         if (*uuid_len < strlen(UUID_STR))
106                 return;
107
108         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
109                     UUID_STR, strlen(UUID_STR)))
110                 *uuid_len -= strlen(UUID_STR);
111 }
112
113 /* Returns true if import was FULL, false if import was already not
114  * connected.
115  */
116 int ptlrpc_set_import_discon(struct obd_import *imp)
117 {
118         unsigned long flags;
119         int rc = 0;
120
121         spin_lock_irqsave(&imp->imp_lock, flags);
122
123         if (imp->imp_state == LUSTRE_IMP_FULL) {
124                 char *target_start;
125                 int   target_len;
126
127                 deuuidify(imp->imp_target_uuid.uuid, NULL,
128                           &target_start, &target_len);
129
130                 LCONSOLE_ERROR("Connection to service %.*s via nid %s was "
131                                "lost; in progress operations using this "
132                                "service will %s.\n",
133                                target_len, target_start,
134                                libcfs_nid2str(imp->imp_connection->c_peer.nid),
135                                imp->imp_replayable ?
136                                "wait for recovery to complete" : "fail");
137
138                 if (obd_dump_on_timeout)
139                         libcfs_debug_dumplog();
140
141                 CDEBUG(D_HA, "%s: connection lost to %s@%s\n",
142                       imp->imp_obd->obd_name,
143                       imp->imp_target_uuid.uuid,
144                       imp->imp_connection->c_remote_uuid.uuid);
145                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
146                 spin_unlock_irqrestore(&imp->imp_lock, flags);
147                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
148                 rc = 1;
149         } else {
150                 spin_unlock_irqrestore(&imp->imp_lock, flags);
151                 CDEBUG(D_HA, "%p %s: import already not connected: %s\n",
152                        imp,imp->imp_client->cli_name,
153                        ptlrpc_import_state_name(imp->imp_state));
154         }
155
156         return rc;
157 }
158
159 /*
160  * This acts as a barrier; all existing requests are rejected, and
161  * no new requests will be accepted until the import is valid again.
162  */
163 void ptlrpc_deactivate_import(struct obd_import *imp)
164 {
165         unsigned long flags;
166         ENTRY;
167
168         spin_lock_irqsave(&imp->imp_lock, flags);
169         CDEBUG(D_HA, "setting import %s INVALID\n", imp->imp_target_uuid.uuid);
170         imp->imp_invalid = 1;
171         imp->imp_generation++;
172         spin_unlock_irqrestore(&imp->imp_lock, flags);
173
174         ptlrpc_abort_inflight(imp);
175         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
176 }
177
178 /*
179  * This function will invalidate the import, if necessary, then block
180  * for all the RPC completions, and finally notify the obd to
181  * invalidate its state (ie cancel locks, clear pending requests,
182  * etc).
183  */
184 void ptlrpc_invalidate_import(struct obd_import *imp)
185 {
186         struct l_wait_info lwi;
187         int rc;
188
189         if (!imp->imp_invalid)
190                 ptlrpc_deactivate_import(imp);
191
192         LASSERT(imp->imp_invalid);
193
194         /* wait for all requests to error out and call completion callbacks */
195         lwi = LWI_TIMEOUT_INTR(MAX(obd_timeout * HZ, 1), NULL,
196                                NULL, NULL);
197         rc = l_wait_event(imp->imp_recovery_waitq,
198                           (atomic_read(&imp->imp_inflight) == 0),
199                           &lwi);
200
201         if (rc)
202                 CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
203                        imp->imp_target_uuid.uuid, rc,
204                        atomic_read(&imp->imp_inflight));
205
206         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
207 }
208
209 void ptlrpc_activate_import(struct obd_import *imp)
210 {
211         struct obd_device *obd = imp->imp_obd;
212         unsigned long flags;
213
214         spin_lock_irqsave(&imp->imp_lock, flags);
215         imp->imp_invalid = 0;
216         spin_unlock_irqrestore(&imp->imp_lock, flags);
217
218         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
219 }
220
221 void ptlrpc_fail_import(struct obd_import *imp, int generation)
222 {
223         ENTRY;
224
225         LASSERT (!imp->imp_dlm_fake);
226
227         if (ptlrpc_set_import_discon(imp)) {
228                 unsigned long flags;
229
230                 if (!imp->imp_replayable) {
231                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
232                                "auto-deactivating\n",
233                                imp->imp_target_uuid.uuid,
234                                imp->imp_connection->c_remote_uuid.uuid,
235                                imp->imp_obd->obd_name);
236                         ptlrpc_deactivate_import(imp);
237                 }
238
239                 CDEBUG(D_HA, "%s: waking up pinger\n",
240                        imp->imp_target_uuid.uuid);
241
242                 spin_lock_irqsave(&imp->imp_lock, flags);
243                 imp->imp_force_verify = 1;
244                 spin_unlock_irqrestore(&imp->imp_lock, flags);
245
246                 ptlrpc_pinger_wake_up();
247         }
248         EXIT;
249 }
250
251 static int import_select_connection(struct obd_import *imp)
252 {
253         struct obd_import_conn *imp_conn;
254         struct obd_export *dlmexp;
255         ENTRY;
256
257         spin_lock(&imp->imp_lock);
258
259         if (list_empty(&imp->imp_conn_list)) {
260                 CERROR("%s: no connections available\n",
261                         imp->imp_obd->obd_name);
262                 spin_unlock(&imp->imp_lock);
263                 RETURN(-EINVAL);
264         }
265
266         if (imp->imp_conn_current &&
267             imp->imp_conn_current->oic_item.next != &imp->imp_conn_list) {
268                 imp_conn = list_entry(imp->imp_conn_current->oic_item.next,
269                                       struct obd_import_conn, oic_item);
270         } else {
271                 imp_conn = list_entry(imp->imp_conn_list.next,
272                                       struct obd_import_conn, oic_item);
273         }
274
275         /* switch connection, don't mind if it's same as the current one */
276         if (imp->imp_connection)
277                 ptlrpc_put_connection(imp->imp_connection);
278         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
279
280         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
281         LASSERT(dlmexp != NULL);
282         if (dlmexp->exp_connection)
283                 ptlrpc_put_connection(dlmexp->exp_connection);
284         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
285         class_export_put(dlmexp);
286
287         if (imp->imp_conn_current && (imp->imp_conn_current != imp_conn)) {
288                 LCONSOLE_WARN("Changing connection for %s to %s\n",
289                               imp->imp_obd->obd_name, imp_conn->oic_uuid.uuid);
290         }
291         imp->imp_conn_current = imp_conn;
292         CDEBUG(D_HA, "%s: import %p using connection %s\n",
293                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid);
294         spin_unlock(&imp->imp_lock);
295
296         RETURN(0);
297 }
298
299 int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
300 {
301         struct obd_device *obd = imp->imp_obd;
302         int initial_connect = 0;
303         int rc;
304         __u64 committed_before_reconnect = 0;
305         struct ptlrpc_request *request;
306         int size[] = {sizeof(imp->imp_target_uuid),
307                       sizeof(obd->obd_uuid),
308                       sizeof(imp->imp_dlm_handle),
309                       sizeof(imp->imp_connect_data)};
310         char *tmp[] = {imp->imp_target_uuid.uuid,
311                        obd->obd_uuid.uuid,
312                        (char *)&imp->imp_dlm_handle,
313                        (char *)&imp->imp_connect_data};
314         struct ptlrpc_connect_async_args *aa;
315         unsigned long flags;
316
317         spin_lock_irqsave(&imp->imp_lock, flags);
318         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
319                 spin_unlock_irqrestore(&imp->imp_lock, flags);
320                 CERROR("can't connect to a closed import\n");
321                 RETURN(-EINVAL);
322         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
323                 spin_unlock_irqrestore(&imp->imp_lock, flags);
324                 CERROR("already connected\n");
325                 RETURN(0);
326         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
327                 spin_unlock_irqrestore(&imp->imp_lock, flags);
328                 CERROR("already connecting\n");
329                 RETURN(-EALREADY);
330         }
331
332         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
333
334         imp->imp_conn_cnt++;
335         imp->imp_resend_replay = 0;
336
337         if (imp->imp_remote_handle.cookie == 0) {
338                 initial_connect = 1;
339         } else {
340                 committed_before_reconnect = imp->imp_peer_committed_transno;
341         }
342
343         spin_unlock_irqrestore(&imp->imp_lock, flags);
344
345         if (new_uuid) {
346                 struct obd_uuid uuid;
347
348                 obd_str2uuid(&uuid, new_uuid);
349                 rc = import_set_conn_priority(imp, &uuid);
350                 if (rc)
351                         GOTO(out, rc);
352         }
353
354         rc = import_select_connection(imp);
355         if (rc)
356                 GOTO(out, rc);
357
358         if (imp->imp_initial_recov_bk && initial_connect &&
359             /* last in list */
360             (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list)) {
361                 CERROR("Last connection (%d) for %s, turning off init_recov\n",
362                        imp->imp_conn_cnt, imp->imp_target_uuid.uuid);
363                 /* Don't retry if connect fails */
364                 rc = 0;
365                 obd_set_info(obd->obd_self_export,
366                              strlen("initial_recov"), "initial_recov",
367                              sizeof(rc), &rc);
368         }
369
370         rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
371                            &obd->obd_uuid, &imp->imp_connect_data);
372         if (rc)
373                 GOTO(out, rc);
374
375         request = ptlrpc_prep_req(imp, imp->imp_connect_op, 4, size, tmp);
376         if (!request)
377                 GOTO(out, rc = -ENOMEM);
378
379 #ifndef __KERNEL__
380         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
381 #endif
382
383         request->rq_send_state = LUSTRE_IMP_CONNECTING;
384         /* Allow a slightly larger reply for future growth compatibility */
385         size[0] = sizeof(struct obd_connect_data) + 16 * sizeof(__u64);
386         request->rq_replen = lustre_msg_size(1, size);
387         request->rq_interpret_reply = ptlrpc_connect_interpret;
388
389         CLASSERT(sizeof (*aa) <= sizeof (request->rq_async_args));
390         aa = (struct ptlrpc_connect_async_args *)&request->rq_async_args;
391         memset(aa, 0, sizeof *aa);
392
393         aa->pcaa_peer_committed = committed_before_reconnect;
394         aa->pcaa_initial_connect = initial_connect;
395
396         if (aa->pcaa_initial_connect) {
397                 imp->imp_replayable = 1;
398                 /* On an initial connect, we don't know which one of a
399                    failover server pair is up.  Don't wait long. */
400                 request->rq_timeout = max((int)(obd_timeout / 20), 5);
401         }
402
403         DEBUG_REQ(D_RPCTRACE, request, "(re)connect request");
404         ptlrpcd_add_req(request);
405         rc = 0;
406 out:
407         if (rc != 0) {
408                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
409         }
410
411         RETURN(rc);
412 }
413 EXPORT_SYMBOL(ptlrpc_connect_import);
414
415 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
416 {
417         struct obd_import_conn *imp_conn;
418         unsigned long flags;
419         int wake_pinger = 0;
420
421         ENTRY;
422
423         spin_lock_irqsave(&imp->imp_lock, flags);
424         if (list_empty(&imp->imp_conn_list))
425                 GOTO(unlock, 0);
426
427         imp_conn = list_entry(imp->imp_conn_list.prev,
428                               struct obd_import_conn,
429                               oic_item);
430
431         if (imp->imp_conn_current != imp_conn) {
432                 ptlrpc_ping_import_soon(imp);
433                 wake_pinger = 1;
434         }
435
436  unlock:
437         spin_unlock_irqrestore(&imp->imp_lock, flags);
438
439         if (wake_pinger)
440                 ptlrpc_pinger_wake_up();
441
442         EXIT;
443 }
444
445 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
446                                     void * data, int rc)
447 {
448         struct ptlrpc_connect_async_args *aa = data;
449         struct obd_import *imp = request->rq_import;
450         struct lustre_handle old_hdl;
451         unsigned long flags;
452         int msg_flags;
453         ENTRY;
454
455         spin_lock_irqsave(&imp->imp_lock, flags);
456         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
457                 spin_unlock_irqrestore(&imp->imp_lock, flags);
458                 RETURN(0);
459         }
460         spin_unlock_irqrestore(&imp->imp_lock, flags);
461
462         if (rc)
463                 GOTO(out, rc);
464
465         LASSERT(imp->imp_conn_current);
466
467         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
468
469         /* All imports are pingable */
470         imp->imp_pingable = 1;
471
472         if (aa->pcaa_initial_connect) {
473                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
474                         CDEBUG(D_HA, "connected to replayable target: %s\n",
475                                imp->imp_target_uuid.uuid);
476                         imp->imp_replayable = 1;
477                 } else {
478                         imp->imp_replayable = 0;
479                 }
480                 imp->imp_remote_handle = request->rq_repmsg->handle;
481
482                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
483                 GOTO(finish, rc = 0);
484         }
485
486         /* Determine what recovery state to move the import to. */
487         if (MSG_CONNECT_RECONNECT & msg_flags) {
488                 memset(&old_hdl, 0, sizeof(old_hdl));
489                 if (!memcmp(&old_hdl, &request->rq_repmsg->handle,
490                             sizeof (old_hdl))) {
491                         CERROR("%s@%s didn't like our handle "LPX64
492                                ", failed\n", imp->imp_target_uuid.uuid,
493                                imp->imp_connection->c_remote_uuid.uuid,
494                                imp->imp_dlm_handle.cookie);
495                         GOTO(out, rc = -ENOTCONN);
496                 }
497
498                 if (memcmp(&imp->imp_remote_handle, &request->rq_repmsg->handle,
499                            sizeof(imp->imp_remote_handle))) {
500                         CERROR("%s@%s changed handle from "LPX64" to "LPX64
501                                "; copying, but this may foreshadow disaster\n",
502                                imp->imp_target_uuid.uuid,
503                                imp->imp_connection->c_remote_uuid.uuid,
504                                imp->imp_remote_handle.cookie,
505                                request->rq_repmsg->handle.cookie);
506                         imp->imp_remote_handle = request->rq_repmsg->handle;
507                 } else {
508                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
509                                imp->imp_target_uuid.uuid,
510                                imp->imp_connection->c_remote_uuid.uuid);
511                 }
512
513                 if (imp->imp_invalid) {
514                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
515                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
516                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
517                                imp->imp_obd->obd_name,
518                                imp->imp_target_uuid.uuid);
519                         imp->imp_resend_replay = 1;
520                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
521                 } else {
522                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
523                 }
524         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
525                 LASSERT(imp->imp_replayable);
526                 imp->imp_remote_handle = request->rq_repmsg->handle;
527                 imp->imp_last_replay_transno = 0;
528                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
529         } else {
530                 imp->imp_remote_handle = request->rq_repmsg->handle;
531                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
532         }
533
534         /* Sanity checks for a reconnected import. */
535         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
536                 CERROR("imp_replayable flag does not match server "
537                        "after reconnect. We should LBUG right here.\n");
538         }
539
540         if (request->rq_repmsg->last_committed < aa->pcaa_peer_committed) {
541                 CERROR("%s went back in time (transno "LPD64
542                        " was previously committed, server now claims "LPD64
543                        ")! is shared storage not coherent?\n",
544                        imp->imp_target_uuid.uuid,
545                        aa->pcaa_peer_committed,
546                        request->rq_repmsg->last_committed);
547         }
548
549 finish:
550         rc = ptlrpc_import_recovery_state_machine(imp);
551         if (rc != 0) {
552                 if (rc == -ENOTCONN) {
553                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
554                                "invalidating and reconnecting\n",
555                                imp->imp_target_uuid.uuid,
556                                imp->imp_connection->c_remote_uuid.uuid);
557                         ptlrpc_connect_import(imp, NULL);
558                         RETURN(0);
559                 }
560         } else {
561                 struct obd_connect_data *ocd;
562
563                 ocd = lustre_swab_repbuf(request, 0,
564                                          sizeof *ocd, lustre_swab_connect);
565                 if (ocd == NULL) {
566                         CERROR("Wrong connect data from server\n");
567                         rc = -EPROTO;
568                         GOTO(out, rc);
569                 }
570                 spin_lock_irqsave(&imp->imp_lock, flags);
571                 /*
572                  * check that server granted subset of flags we asked for.
573                  */
574                 LASSERT((ocd->ocd_connect_flags &
575                          imp->imp_connect_data.ocd_connect_flags) ==
576                         ocd->ocd_connect_flags);
577                 imp->imp_connect_data = *ocd;
578                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
579
580                 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
581                     (ocd->ocd_version > LUSTRE_VERSION_CODE +
582                     LUSTRE_VERSION_OFFSET_WARN)) {
583                         /* Sigh, some compilers do not like #ifdef in the middle
584                            of macro arguments */
585 #ifdef __KERNEL__
586                         char *action = "upgrading this client";
587 #else
588                         char *action = "recompiling this application";
589 #endif
590                         
591                         CWARN("Server %s version (%d.%d.%d.%d) is much newer. "
592                               "Consider %s (%s).\n",
593                               imp->imp_target_uuid.uuid,
594                               OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
595                               OBD_OCD_VERSION_MINOR(ocd->ocd_version),
596                               OBD_OCD_VERSION_PATCH(ocd->ocd_version),
597                               OBD_OCD_VERSION_FIX(ocd->ocd_version),
598                               action, LUSTRE_VERSION_STRING);
599                 }
600
601                 if (imp->imp_conn_current != NULL) {
602                         list_del(&imp->imp_conn_current->oic_item);
603                         list_add(&imp->imp_conn_current->oic_item,
604                                  &imp->imp_conn_list);
605                         imp->imp_conn_current = NULL;
606                         spin_unlock_irqrestore(&imp->imp_lock, flags);
607                 } else {
608                         static int bug7269_dump = 0;
609                         spin_unlock_irqrestore(&imp->imp_lock, flags);
610                         CERROR("this is bug 7269 - please attach log there\n");
611                         if (bug7269_dump == 0)
612                                 libcfs_debug_dumplog();
613                         bug7269_dump = 1;
614                 }
615         }
616
617  out:
618         if (rc != 0) {
619
620                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
621                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov) {
622                         ptlrpc_deactivate_import(imp);
623                 }
624
625                 if (rc == -EPROTO) {
626                         struct obd_connect_data *ocd;
627                         ocd = lustre_swab_repbuf(request, 0,
628                                                  sizeof *ocd,
629                                                  lustre_swab_connect);
630                         if (ocd && 
631                             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) && 
632                             (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
633                            /* Actually servers are only supposed to refuse
634                               connection from liblustre clients, so we should
635                               never see this from VFS context */
636                                 CERROR("Server %s version (%d.%d.%d.%d) refused"
637                                       " connection from this client as too old "
638                                       "version (%s). Client must be "
639                                       "recompiled\n",
640                                       imp->imp_target_uuid.uuid,
641                                       OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
642                                       OBD_OCD_VERSION_MINOR(ocd->ocd_version),
643                                       OBD_OCD_VERSION_PATCH(ocd->ocd_version),
644                                       OBD_OCD_VERSION_FIX(ocd->ocd_version),
645                                       LUSTRE_VERSION_STRING);
646                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
647                                 RETURN(-EPROTO);
648                         }
649                 }
650                         
651                 ptlrpc_maybe_ping_import_soon(imp);
652
653                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
654                        imp->imp_target_uuid.uuid,
655                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
656         }
657
658         wake_up(&imp->imp_recovery_waitq);
659         RETURN(rc);
660 }
661
662 static int completed_replay_interpret(struct ptlrpc_request *req,
663                                     void * data, int rc)
664 {
665         atomic_dec(&req->rq_import->imp_replay_inflight);
666         if (req->rq_status == 0) {
667                 ptlrpc_import_recovery_state_machine(req->rq_import);
668         } else {
669                 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
670                        "reconnecting\n",
671                        req->rq_import->imp_obd->obd_name, req->rq_status);
672                 ptlrpc_connect_import(req->rq_import, NULL);
673         }
674
675         RETURN(0);
676 }
677
678 static int signal_completed_replay(struct obd_import *imp)
679 {
680         struct ptlrpc_request *req;
681         ENTRY;
682
683         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
684         atomic_inc(&imp->imp_replay_inflight);
685
686         req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL, NULL);
687         if (!req) {
688                 atomic_dec(&imp->imp_replay_inflight);
689                 RETURN(-ENOMEM);
690         }
691
692         req->rq_replen = lustre_msg_size(0, NULL);
693         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
694         req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
695         req->rq_timeout *= 3;
696         req->rq_interpret_reply = completed_replay_interpret;
697
698         ptlrpcd_add_req(req);
699         RETURN(0);
700 }
701
702 #ifdef __KERNEL__
703 static int ptlrpc_invalidate_import_thread(void *data)
704 {
705         struct obd_import *imp = data;
706         unsigned long flags;
707
708         ENTRY;
709
710         lock_kernel();
711         ptlrpc_daemonize();
712
713         SIGNAL_MASK_LOCK(current, flags);
714         sigfillset(&current->blocked);
715         RECALC_SIGPENDING;
716         SIGNAL_MASK_UNLOCK(current, flags);
717         THREAD_NAME(current->comm, sizeof(current->comm), "ll_imp_inval");
718         unlock_kernel();
719
720         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
721                imp->imp_obd->obd_name, imp->imp_target_uuid.uuid,
722                imp->imp_connection->c_remote_uuid.uuid);
723
724         ptlrpc_invalidate_import(imp);
725
726         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
727         ptlrpc_import_recovery_state_machine(imp);
728
729         RETURN(0);
730 }
731 #endif
732
733 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
734 {
735         int rc = 0;
736         int inflight;
737         char *target_start;
738         int target_len;
739
740         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
741                 deuuidify(imp->imp_target_uuid.uuid, NULL,
742                           &target_start, &target_len);
743                 LCONSOLE_ERROR("This client was evicted by %.*s; in progress "
744                                "operations using this service will fail.\n",
745                                target_len, target_start);
746                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
747                        imp->imp_target_uuid.uuid,
748                        imp->imp_connection->c_remote_uuid.uuid);
749
750 #ifdef __KERNEL__
751                 rc = kernel_thread(ptlrpc_invalidate_import_thread, imp,
752                                    CLONE_VM | CLONE_FILES);
753                 if (rc < 0)
754                         CERROR("error starting invalidate thread: %d\n", rc);
755                 else
756                         rc = 0;
757                 RETURN(rc);
758 #else
759                 ptlrpc_invalidate_import(imp);
760
761                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
762 #endif
763         }
764
765         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
766                 CDEBUG(D_HA, "replay requested by %s\n",
767                        imp->imp_target_uuid.uuid);
768                 rc = ptlrpc_replay_next(imp, &inflight);
769                 if (inflight == 0 &&
770                     atomic_read(&imp->imp_replay_inflight) == 0) {
771                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
772                         rc = ldlm_replay_locks(imp);
773                         if (rc)
774                                 GOTO(out, rc);
775                 }
776                 rc = 0;
777         }
778
779         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
780                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
781                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
782                         rc = signal_completed_replay(imp);
783                         if (rc)
784                                 GOTO(out, rc);
785                 }
786
787         }
788
789         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
790                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
791                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
792                 }
793         }
794
795         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
796                 char   *nidstr;
797
798                 CDEBUG(D_HA, "reconnected to %s@%s\n",
799                        imp->imp_target_uuid.uuid,
800                        imp->imp_connection->c_remote_uuid.uuid);
801
802                 rc = ptlrpc_resend(imp);
803                 if (rc)
804                         GOTO(out, rc);
805                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
806                 ptlrpc_activate_import(imp);
807
808                 deuuidify(imp->imp_target_uuid.uuid, NULL,
809                           &target_start, &target_len);
810                 nidstr = libcfs_nid2str(imp->imp_connection->c_peer.nid);
811
812                 LCONSOLE_INFO("Connection restored to service %.*s using nid "
813                               "%s.\n", target_len, target_start, nidstr);
814
815                 CWARN("%s: connection restored to %s@%s\n",
816                       imp->imp_obd->obd_name,
817                       imp->imp_target_uuid.uuid,
818                       imp->imp_connection->c_remote_uuid.uuid);
819         }
820
821         if (imp->imp_state == LUSTRE_IMP_FULL) {
822                 wake_up(&imp->imp_recovery_waitq);
823                 ptlrpc_wake_delayed(imp);
824         }
825
826  out:
827         RETURN(rc);
828 }
829
830 static int back_to_sleep(void *unused)
831 {
832         return 0;
833 }
834
835 int ptlrpc_disconnect_import(struct obd_import *imp)
836 {
837         struct ptlrpc_request *request;
838         int rq_opc;
839         int rc = 0;
840         unsigned long flags;
841         ENTRY;
842
843         switch (imp->imp_connect_op) {
844         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
845         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
846         case MGMT_CONNECT: rq_opc = MGMT_DISCONNECT; break;
847         default:
848                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
849                        imp->imp_target_uuid.uuid, imp->imp_connect_op);
850                 RETURN(-EINVAL);
851         }
852
853         if (ptlrpc_import_in_recovery(imp)) {
854                 struct l_wait_info lwi;
855                 lwi = LWI_TIMEOUT_INTR(MAX(obd_timeout * HZ, 1), back_to_sleep,
856                                        NULL, NULL);
857                 rc = l_wait_event(imp->imp_recovery_waitq,
858                                   !ptlrpc_import_in_recovery(imp), &lwi);
859
860         }
861
862         spin_lock_irqsave(&imp->imp_lock, flags);
863         if (imp->imp_state != LUSTRE_IMP_FULL)
864                 GOTO(out, 0);
865
866         spin_unlock_irqrestore(&imp->imp_lock, flags);
867
868         request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
869         if (request) {
870                 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
871                  * it fails.  We can get through the above with a down server
872                  * if the client doesn't know the server is gone yet. */
873                 request->rq_no_resend = 1;
874                 request->rq_timeout = 5;
875                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
876                 request->rq_send_state =  LUSTRE_IMP_CONNECTING;
877                 request->rq_replen = lustre_msg_size(0, NULL);
878                 rc = ptlrpc_queue_wait(request);
879                 ptlrpc_req_finished(request);
880         }
881
882         spin_lock_irqsave(&imp->imp_lock, flags);
883 out:
884         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
885         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
886         spin_unlock_irqrestore(&imp->imp_lock, flags);
887
888         RETURN(rc);
889 }
890