Whamcloud - gitweb
- landing of b_fid after merge with b_hd_cleanup_merge.
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Mike Shaver <shaver@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifdef __KERNEL__
25 # include <linux/config.h>
26 # include <linux/module.h>
27 # include <linux/kmod.h>
28 #else
29 # include <liblustre.h>
30 #endif
31
32 #include <linux/obd_support.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_net.h>
35 #include <linux/lustre_import.h>
36 #include <linux/lustre_export.h>
37 #include <linux/obd.h>
38 #include <linux/obd_class.h>
39
40 #include "ptlrpc_internal.h"
41
42 struct ptlrpc_connect_async_args {
43          __u64 pcaa_peer_committed;
44         int pcaa_initial_connect;
45 };
46
47 /* A CLOSED import should remain so. */
48 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
49 do {                                                                           \
50         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
51                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
52                       imp, imp->imp_target_uuid.uuid,                          \
53                       ptlrpc_import_state_name(imp->imp_state),                \
54                       ptlrpc_import_state_name(state));                        \
55                imp->imp_state = state;                                         \
56         }                                                                      \
57 } while(0)
58
59 #define IMPORT_SET_STATE(imp, state)                    \
60 do {                                                    \
61         unsigned long flags;                            \
62                                                         \
63         spin_lock_irqsave(&imp->imp_lock, flags);       \
64         IMPORT_SET_STATE_NOLOCK(imp, state);            \
65         spin_unlock_irqrestore(&imp->imp_lock, flags);  \
66 } while(0)
67
68
69 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
70                                     void * data, int rc);
71 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
72
73 /* Only this function is allowed to change the import state when it is
74  * CLOSED. I would rather refcount the import and free it after
75  * disconnection like we do with exports. To do that, the client_obd
76  * will need to save the peer info somewhere other than in the import,
77  * though. */
78 int ptlrpc_init_import(struct obd_import *imp)
79 {
80         unsigned long flags;
81
82         spin_lock_irqsave(&imp->imp_lock, flags);
83
84         imp->imp_generation++;
85         imp->imp_state =  LUSTRE_IMP_NEW;
86
87         spin_unlock_irqrestore(&imp->imp_lock, flags);
88
89         return 0;
90 }
91
92 /* Returns true if import was FULL, false if import was already not
93  * connected.
94  */
95 int ptlrpc_set_import_discon(struct obd_import *imp)
96 {
97         unsigned long flags;
98         int rc = 0;
99
100         spin_lock_irqsave(&imp->imp_lock, flags);
101
102         if (imp->imp_state == LUSTRE_IMP_FULL) {
103                 CERROR("%s: connection lost to %s@%s\n",
104                        imp->imp_obd->obd_name,
105                        imp->imp_target_uuid.uuid,
106                        imp->imp_connection->c_remote_uuid.uuid);
107                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
108                 spin_unlock_irqrestore(&imp->imp_lock, flags);
109                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
110                 rc = 1;
111         } else {
112                 spin_unlock_irqrestore(&imp->imp_lock, flags);
113                 CDEBUG(D_HA, "%p %s: import already not connected: %s\n",
114                        imp,imp->imp_client->cli_name,
115                        ptlrpc_import_state_name(imp->imp_state));
116         }
117
118         return rc;
119 }
120
121 /*
122  * This acts as a barrier; all existing requests are rejected, and
123  * no new requests will be accepted until the import is valid again.
124  */
125 void ptlrpc_deactivate_import(struct obd_import *imp)
126 {
127         unsigned long flags;
128         ENTRY;
129
130         spin_lock_irqsave(&imp->imp_lock, flags);
131         CDEBUG(D_HA, "setting import %s INVALID\n",
132                imp->imp_target_uuid.uuid);
133         imp->imp_invalid = 1;
134         imp->imp_generation++;
135         spin_unlock_irqrestore(&imp->imp_lock, flags);
136
137         ptlrpc_abort_inflight(imp);
138         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
139 }
140
141 /*
142  * This function will invalidate the import, if necessary, then block
143  * for all the RPC completions, and finally notify the obd to
144  * invalidate its state (ie cancel locks, clear pending requests,
145  * etc).
146  *
147  * in_rpc: true if this is called while processing an rpc, like
148  *    CONNECT. It will allow for one RPC to be inflight while
149  *    waiting for requests to complete. Ugly, yes, but I don't see an
150  *    cleaner way right now.
151  */
152 void ptlrpc_invalidate_import(struct obd_import *imp, int in_rpc)
153 {
154         struct l_wait_info lwi;
155         unsigned long timeout;
156         int inflight = 0;
157         int rc;
158
159         if (!imp->imp_invalid)
160                 ptlrpc_deactivate_import(imp);
161
162         LASSERT(imp->imp_invalid);
163
164         if (in_rpc)
165                 inflight = 1;
166
167         /* wait for all requests to error out and call completion 
168            callbacks */
169         if (imp->imp_server_timeout)
170                 timeout = obd_timeout / 2;
171         else
172                 timeout = obd_timeout;
173         timeout = MAX(timeout * HZ, 1);
174         lwi = LWI_TIMEOUT_INTR(timeout, NULL, NULL, NULL);
175         rc = l_wait_event(imp->imp_recovery_waitq, 
176                           (atomic_read(&imp->imp_inflight) == inflight), 
177                           &lwi);
178
179         if (rc)
180                 CERROR("%s: rc = %d waiting for callback (%d != %d)\n",
181                        imp->imp_target_uuid.uuid, rc,
182                        atomic_read(&imp->imp_inflight), inflight);
183
184         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
185 }
186
187 void ptlrpc_activate_import(struct obd_import *imp)
188 {
189         struct obd_device *obd = imp->imp_obd;
190         unsigned long flags;
191
192         spin_lock_irqsave(&imp->imp_lock, flags);
193         imp->imp_invalid = 0;
194         spin_unlock_irqrestore(&imp->imp_lock, flags);
195
196         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
197 }
198
199 void ptlrpc_fail_import(struct obd_import *imp, int generation)
200 {
201         ENTRY;
202
203         LASSERT (!imp->imp_dlm_fake);
204
205         if (ptlrpc_set_import_discon(imp)) {
206                 unsigned long flags;
207
208                 if (!imp->imp_replayable) {
209                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
210                                "auto-deactivating\n",
211                                imp->imp_target_uuid.uuid,
212                                imp->imp_connection->c_remote_uuid.uuid,
213                                imp->imp_obd->obd_name);
214                         ptlrpc_deactivate_import(imp);
215                 }
216
217                 CDEBUG(D_HA, "%s: waking up pinger\n",
218                        imp->imp_target_uuid.uuid);
219
220                 spin_lock_irqsave(&imp->imp_lock, flags);
221                 imp->imp_force_verify = 1;
222                 spin_unlock_irqrestore(&imp->imp_lock, flags);
223
224                 ptlrpc_pinger_wake_up();
225         }
226         EXIT;
227 }
228
229 #define ATTEMPT_TOO_SOON(last)  \
230         ((last) && ((long)(jiffies - (last)) <= (long)(obd_timeout * 2 * HZ)))
231
232 static int import_select_connection(struct obd_import *imp)
233 {
234         struct obd_import_conn *imp_conn, *tmp;
235         struct obd_export *dlmexp;
236         int found = 0;
237         ENTRY;
238
239         spin_lock(&imp->imp_lock);
240
241         if (list_empty(&imp->imp_conn_list)) {
242                 CERROR("no available connections on imp %p@%s\n",
243                         imp, imp->imp_obd->obd_name);
244                 spin_unlock(&imp->imp_lock);
245                 RETURN(-EINVAL);
246         }
247
248         list_for_each_entry(imp_conn, &imp->imp_conn_list, oic_item) {
249                 if (!ATTEMPT_TOO_SOON(imp_conn->oic_last_attempt)) {
250                         found = 1;
251                         break;
252                 }
253         }
254
255         /* if not found, simply choose the current one */
256         if (!found) {
257                 CWARN("obd %s imp 0x%p: all connections have been "
258                       "tried recently\n", imp->imp_obd->obd_name, imp);
259                 LASSERT(imp->imp_conn_current);
260                 imp_conn = imp->imp_conn_current;
261         }
262         LASSERT(imp_conn->oic_conn);
263
264         imp_conn->oic_last_attempt = jiffies;
265
266         /* move the items ahead of the selected one to list tail */
267         while (1) {
268                 tmp= list_entry(imp->imp_conn_list.next,
269                                 struct obd_import_conn, oic_item);
270                 if (tmp == imp_conn)
271                         break;
272                 list_del(&tmp->oic_item);
273                 list_add_tail(&tmp->oic_item, &imp->imp_conn_list);
274         }
275
276         /* switch connection, don't mind if it's same as the current one */
277         if (imp->imp_connection)
278                 ptlrpc_put_connection(imp->imp_connection);
279         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
280
281         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
282         LASSERT(dlmexp != NULL);
283         if (dlmexp->exp_connection)
284                 ptlrpc_put_connection(imp->imp_connection);
285         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
286         class_export_put(dlmexp);
287
288         imp->imp_conn_current = imp_conn;
289         CWARN("obd %s imp 0x%p: select conn %s\n",
290                imp->imp_obd->obd_name, imp,
291                imp_conn->oic_uuid.uuid);
292         spin_unlock(&imp->imp_lock);
293
294         RETURN(0);
295 }
296
297
298
299 int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
300 {
301         struct obd_device *obd = imp->imp_obd;
302         int initial_connect = 0;
303         int rc;
304         __u64 committed_before_reconnect = 0;
305         struct ptlrpc_request *request;
306         int size[] = {sizeof(imp->imp_target_uuid),
307                                  sizeof(obd->obd_uuid),
308                                  sizeof(imp->imp_dlm_handle),
309                                  sizeof(unsigned long)};
310         char *tmp[] = {imp->imp_target_uuid.uuid,
311                        obd->obd_uuid.uuid,
312                        (char *)&imp->imp_dlm_handle,
313                        (char *)&imp->imp_connect_flags}; /* XXX: make this portable! */
314         struct ptlrpc_connect_async_args *aa;
315         unsigned long flags;
316
317         spin_lock_irqsave(&imp->imp_lock, flags);
318         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
319                 spin_unlock_irqrestore(&imp->imp_lock, flags);
320                 CERROR("can't connect to a closed import\n");
321                 RETURN(-EINVAL);
322         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
323                 spin_unlock_irqrestore(&imp->imp_lock, flags);
324                 CERROR("already connected\n");
325                 RETURN(0);
326         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
327                 spin_unlock_irqrestore(&imp->imp_lock, flags);
328                 CERROR("already connecting\n");
329                 RETURN(-EALREADY);
330         }
331
332         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
333
334         imp->imp_resend_replay = 0;
335
336         if (imp->imp_remote_handle.cookie == 0) {
337                 initial_connect = 1;
338         } else {
339                 committed_before_reconnect = imp->imp_peer_committed_transno;;
340                 imp->imp_conn_cnt++;
341         }
342
343
344         spin_unlock_irqrestore(&imp->imp_lock, flags);
345
346         if (new_uuid) {
347                 struct obd_uuid uuid;
348
349                 obd_str2uuid(&uuid, new_uuid);
350
351                 rc = import_set_conn_priority(imp, &uuid);
352                 if (rc)
353                         GOTO(out, rc);
354         }
355         rc = import_select_connection(imp);
356         if (rc)
357                 GOTO(out, rc);
358
359         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION,
360                                   imp->imp_connect_op, 4, size, tmp);
361         if (!request)
362                 GOTO(out, rc = -ENOMEM);
363
364 #ifndef __KERNEL__
365         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
366 #endif
367
368         request->rq_send_state = LUSTRE_IMP_CONNECTING;
369         request->rq_replen = lustre_msg_size(0, NULL);
370         request->rq_interpret_reply = ptlrpc_connect_interpret;
371
372         LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
373         aa = (struct ptlrpc_connect_async_args *)&request->rq_async_args;
374         memset(aa, 0, sizeof *aa);
375
376         aa->pcaa_peer_committed = committed_before_reconnect;
377         aa->pcaa_initial_connect = initial_connect;
378
379         if (aa->pcaa_initial_connect) {
380                 lustre_msg_add_op_flags(request->rq_reqmsg, 
381                                         MSG_CONNECT_INITIAL);
382                 imp->imp_replayable = 1; 
383         }
384
385         ptlrpcd_add_req(request);
386         rc = 0;
387         imp->imp_connect_start = jiffies;
388 out:
389         if (rc != 0) {
390                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
391         }
392
393         RETURN(rc);
394 }
395
396 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
397                                     void *data, int rc)
398 {
399         struct ptlrpc_connect_async_args *aa = data;
400         struct obd_import *imp = request->rq_import;
401         struct lustre_handle old_hdl;
402         unsigned long flags;
403         int msg_flags;
404         ENTRY;
405
406         spin_lock_irqsave(&imp->imp_lock, flags);
407         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
408                 spin_unlock_irqrestore(&imp->imp_lock, flags);
409                 RETURN(0);
410         }
411         spin_unlock_irqrestore(&imp->imp_lock, flags);
412
413         if (rc)
414                 GOTO(out, rc);
415         LASSERT(imp->imp_conn_current);
416         imp->imp_conn_current->oic_last_attempt = 0;
417
418         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
419
420         if (aa->pcaa_initial_connect) {
421                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
422                         CDEBUG(D_HA, "connected to replayable target: %s\n",
423                                imp->imp_target_uuid.uuid);
424                         imp->imp_pingable = imp->imp_replayable = 1;
425                 } else {
426                         imp->imp_replayable = 0;
427                 }
428                 LASSERTF(imp->imp_conn_cnt < request->rq_repmsg->conn_cnt,
429                          "imp conn_cnt %d req conn_cnt %d", 
430                          imp->imp_conn_cnt, request->rq_repmsg->conn_cnt);
431                 imp->imp_conn_cnt = request->rq_repmsg->conn_cnt;
432                 imp->imp_remote_handle = request->rq_repmsg->handle;
433                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
434                 GOTO(finish, rc = 0);
435         }
436
437         /* Determine what recovery state to move the import to. */
438         if (MSG_CONNECT_RECONNECT & msg_flags) {
439                 memset(&old_hdl, 0, sizeof(old_hdl));
440                 if (!memcmp(&old_hdl, &request->rq_repmsg->handle,
441                             sizeof (old_hdl))) {
442                         CERROR("%s@%s didn't like our handle "LPX64
443                                ", failed\n", imp->imp_target_uuid.uuid,
444                                imp->imp_connection->c_remote_uuid.uuid,
445                                imp->imp_dlm_handle.cookie);
446                         GOTO(out, rc = -ENOTCONN);
447                 }
448
449                 if (memcmp(&imp->imp_remote_handle, &request->rq_repmsg->handle,
450                            sizeof(imp->imp_remote_handle))) {
451                         CERROR("%s@%s changed handle from "LPX64" to "LPX64
452                                "; copying, but this may foreshadow disaster\n",
453                                imp->imp_target_uuid.uuid,
454                                imp->imp_connection->c_remote_uuid.uuid,
455                                imp->imp_remote_handle.cookie,
456                                request->rq_repmsg->handle.cookie);
457                         imp->imp_remote_handle = request->rq_repmsg->handle;
458                 } else {
459                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
460                                imp->imp_target_uuid.uuid,
461                                imp->imp_connection->c_remote_uuid.uuid);
462                 }
463
464                 if (imp->imp_invalid) {
465                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
466                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
467                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
468                                imp->imp_obd->obd_name, 
469                                imp->imp_target_uuid.uuid);
470                         imp->imp_resend_replay = 1;
471                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
472                 } else {
473                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
474                 }
475         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
476                 LASSERT(imp->imp_replayable);
477                 imp->imp_remote_handle = request->rq_repmsg->handle;
478                 imp->imp_last_replay_transno = 0;
479                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
480         } else {
481                 CDEBUG(D_HA, "oops! we get evicted from %s\n", imp->imp_target_uuid.uuid);
482                 imp->imp_remote_handle = request->rq_repmsg->handle;
483                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
484         }
485
486         /* Sanity checks for a reconnected import. */
487         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
488                 CERROR("imp_replayable flag does not match server "
489                        "after reconnect. We should LBUG right here.\n");
490         }
491
492         if (request->rq_repmsg->last_committed < aa->pcaa_peer_committed) {
493                 CERROR("%s went back in time (transno "LPD64
494                        " was previously committed, server now claims "LPD64
495                        ")! is shared storage not coherent?\n",
496                        imp->imp_target_uuid.uuid,
497                        aa->pcaa_peer_committed,
498                        request->rq_repmsg->last_committed);
499         }
500
501 finish:
502         rc = ptlrpc_import_recovery_state_machine(imp);
503         if (rc != 0) {
504                 if (rc == -ENOTCONN) {
505                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
506                                "invalidating and reconnecting\n",
507                                imp->imp_target_uuid.uuid,
508                                imp->imp_connection->c_remote_uuid.uuid);
509                         ptlrpc_connect_import(imp, NULL);
510                         RETURN(0);
511                 }
512         }
513  out:
514         if (rc != 0) {
515                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
516                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov)
517                         ptlrpc_deactivate_import(imp);
518                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
519                        imp->imp_target_uuid.uuid,
520                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
521         }
522
523         wake_up(&imp->imp_recovery_waitq);
524         RETURN(rc);
525 }
526
527 static int completed_replay_interpret(struct ptlrpc_request *req,
528                                       void *data, int rc)
529 {
530         atomic_dec(&req->rq_import->imp_replay_inflight);
531         if (req->rq_status == 0) {
532                 ptlrpc_import_recovery_state_machine(req->rq_import);
533         } else {
534                 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
535                        "reconnecting\n", 
536                        req->rq_import->imp_obd->obd_name, req->rq_status);
537                 ptlrpc_connect_import(req->rq_import, NULL);
538         }
539
540         RETURN(0);
541 }
542
543 static int signal_completed_replay(struct obd_import *imp)
544  {
545         struct ptlrpc_request *req;
546         ENTRY;
547
548         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
549         atomic_inc(&imp->imp_replay_inflight);
550
551         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 0, NULL, NULL);
552         if (!req)
553                 RETURN(-ENOMEM);
554
555         req->rq_replen = lustre_msg_size(0, NULL);
556         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
557         req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
558         req->rq_timeout *= 3;
559         req->rq_interpret_reply = completed_replay_interpret;
560
561         ptlrpcd_add_req(req);
562         RETURN(0);
563 }
564
565 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
566 {
567         int rc = 0;
568         int inflight;
569
570         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
571                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
572                        imp->imp_target_uuid.uuid,
573                        imp->imp_connection->c_remote_uuid.uuid);
574
575                 ptlrpc_invalidate_import(imp, 1);
576
577                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
578         }
579
580         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
581                 CDEBUG(D_HA, "replay requested by %s\n",
582                        imp->imp_target_uuid.uuid);
583                 rc = ptlrpc_replay_next(imp, &inflight);
584                 if (inflight == 0 &&
585                     atomic_read(&imp->imp_replay_inflight) == 0) {
586                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
587                         rc = ldlm_replay_locks(imp);
588                         if (rc)
589                                 GOTO(out, rc);
590                 }
591                 rc = 0;
592         }
593
594         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
595                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
596                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
597                         rc = signal_completed_replay(imp);
598                         if (rc)
599                                 GOTO(out, rc);
600                 }
601
602         }
603
604         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
605                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
606                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
607                 }
608         }
609
610         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
611                 CDEBUG(D_HA, "reconnected to %s@%s\n",
612                        imp->imp_target_uuid.uuid,
613                        imp->imp_connection->c_remote_uuid.uuid);
614
615                 rc = ptlrpc_resend(imp);
616                 if (rc)
617                         GOTO(out, rc);
618                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
619                 ptlrpc_activate_import(imp);
620                 CERROR("%s: connection restored to %s@%s\n",
621                        imp->imp_obd->obd_name, 
622                        imp->imp_target_uuid.uuid,
623                        imp->imp_connection->c_remote_uuid.uuid);
624         }
625
626         if (imp->imp_state == LUSTRE_IMP_FULL) {
627                 wake_up(&imp->imp_recovery_waitq);
628                 ptlrpc_wake_delayed(imp);
629         }
630
631  out:
632         RETURN(rc);
633 }
634
635 static int back_to_sleep(void *unused)
636 {
637         return 0;
638 }
639
640 int ptlrpc_disconnect_import(struct obd_import *imp)
641 {
642         struct ptlrpc_request *request;
643         int rq_opc;
644         int rc = 0;
645         unsigned long flags;
646         ENTRY;
647
648         switch (imp->imp_connect_op) {
649         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
650         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
651         case MGMT_CONNECT: rq_opc = MGMT_DISCONNECT; break;
652         default:
653                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
654                        imp->imp_target_uuid.uuid, imp->imp_connect_op);
655                 RETURN(-EINVAL);
656         }
657
658
659         if (ptlrpc_import_in_recovery(imp)) {
660                 struct l_wait_info lwi;
661                 unsigned long timeout;
662                 if (imp->imp_server_timeout)
663                         timeout = obd_timeout / 2;
664                 else
665                         timeout = obd_timeout;
666                 timeout = MAX(timeout * HZ, 1);
667                 lwi = LWI_TIMEOUT_INTR(obd_timeout, back_to_sleep, NULL, NULL);
668                 rc = l_wait_event(imp->imp_recovery_waitq, 
669                                   !ptlrpc_import_in_recovery(imp), &lwi);
670
671         }
672
673         spin_lock_irqsave(&imp->imp_lock, flags);
674         if (imp->imp_state != LUSTRE_IMP_FULL) {
675                 GOTO(out, 0);
676         }
677         spin_unlock_irqrestore(&imp->imp_lock, flags);
678
679         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc,
680                                   0, NULL, NULL);
681         if (request) {
682                 /* For non-replayable connections, don't attempt
683                    reconnect if this fails */
684                 if (!imp->imp_replayable) {
685                         request->rq_no_resend = 1;
686                         IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
687                         request->rq_send_state =  LUSTRE_IMP_CONNECTING;
688                 }
689                 request->rq_replen = lustre_msg_size(0, NULL);
690                 rc = ptlrpc_queue_wait(request);
691                 ptlrpc_req_finished(request);
692         }
693
694         spin_lock_irqsave(&imp->imp_lock, flags);
695 out:
696         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
697         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
698         imp->imp_conn_cnt = 0;
699         spin_unlock_irqrestore(&imp->imp_lock, flags);
700
701         RETURN(rc);
702 }
703