Whamcloud - gitweb
Branch b1_4
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Mike Shaver <shaver@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifdef __KERNEL__
25 # include <linux/config.h>
26 # include <linux/module.h>
27 # include <linux/kmod.h>
28 #else
29 # include <liblustre.h>
30 #endif
31
32 #include <linux/obd_support.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_net.h>
35 #include <linux/lustre_import.h>
36 #include <linux/lustre_export.h>
37 #include <linux/obd.h>
38 #include <linux/obd_class.h>
39
40 #include "ptlrpc_internal.h"
41
42 struct ptlrpc_connect_async_args {
43          __u64 pcaa_peer_committed;
44         int pcaa_initial_connect;
45 };
46
47 /* A CLOSED import should remain so. */
48 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
49 do {                                                                           \
50         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
51                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
52                       imp, imp->imp_target_uuid.uuid,                          \
53                       ptlrpc_import_state_name(imp->imp_state),                \
54                       ptlrpc_import_state_name(state));                        \
55                imp->imp_state = state;                                         \
56         }                                                                      \
57 } while(0)
58
59 #define IMPORT_SET_STATE(imp, state)                    \
60 do {                                                    \
61         unsigned long flags;                            \
62                                                         \
63         spin_lock_irqsave(&imp->imp_lock, flags);       \
64         IMPORT_SET_STATE_NOLOCK(imp, state);            \
65         spin_unlock_irqrestore(&imp->imp_lock, flags);  \
66 } while(0)
67
68
69 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
70                                     void * data, int rc);
71 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
72
73 /* Only this function is allowed to change the import state when it is
74  * CLOSED. I would rather refcount the import and free it after
75  * disconnection like we do with exports. To do that, the client_obd
76  * will need to save the peer info somewhere other than in the import,
77  * though. */
78 int ptlrpc_init_import(struct obd_import *imp)
79 {
80         unsigned long flags;
81
82         spin_lock_irqsave(&imp->imp_lock, flags);
83
84         imp->imp_generation++;
85         imp->imp_state =  LUSTRE_IMP_NEW;
86
87         spin_unlock_irqrestore(&imp->imp_lock, flags);
88
89         return 0;
90 }
91
92 #define UUID_STR "_UUID"
93 static void deuuidify(char *uuid, const char *prefix, char **uuid_start, int *uuid_len)
94 {
95         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
96                 ? uuid : uuid + strlen(prefix);
97
98         *uuid_len = strlen(*uuid_start);
99
100         if (*uuid_len < strlen(UUID_STR))
101                 return;
102         
103         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
104                     UUID_STR, strlen(UUID_STR)))
105                 *uuid_len -= strlen(UUID_STR);
106 }
107
108 /* Returns true if import was FULL, false if import was already not
109  * connected.
110  */
111 int ptlrpc_set_import_discon(struct obd_import *imp)
112 {
113         unsigned long flags;
114         int rc = 0;
115
116         spin_lock_irqsave(&imp->imp_lock, flags);
117
118         if (imp->imp_state == LUSTRE_IMP_FULL) {
119                 char nidbuf[PTL_NALFMT_SIZE];
120                 char *target_start;
121                 int   target_len;
122
123                 deuuidify(imp->imp_target_uuid.uuid, NULL,
124                           &target_start, &target_len);
125
126                 LCONSOLE_ERROR("Connection to service %.*s via nid %s was "
127                                "lost; in progress operations using this "
128                                "service will %s.\n",
129                                target_len, target_start,
130                                ptlrpc_peernid2str(&imp->imp_connection->c_peer,
131                                                   nidbuf),
132                                imp->imp_replayable 
133                                ? "wait for recovery to complete"
134                                : "fail");
135
136                 CWARN("%s: connection lost to %s@%s\n",
137                       imp->imp_obd->obd_name,
138                       imp->imp_target_uuid.uuid,
139                       imp->imp_connection->c_remote_uuid.uuid);
140                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
141                 spin_unlock_irqrestore(&imp->imp_lock, flags);
142                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
143                 rc = 1;
144         } else {
145                 spin_unlock_irqrestore(&imp->imp_lock, flags);
146                 CDEBUG(D_HA, "%p %s: import already not connected: %s\n",
147                        imp,imp->imp_client->cli_name,
148                        ptlrpc_import_state_name(imp->imp_state));
149         }
150
151         return rc;
152 }
153
154 /*
155  * This acts as a barrier; all existing requests are rejected, and
156  * no new requests will be accepted until the import is valid again.
157  */
158 void ptlrpc_deactivate_import(struct obd_import *imp)
159 {
160         unsigned long flags;
161         ENTRY;
162
163         spin_lock_irqsave(&imp->imp_lock, flags);
164         CDEBUG(D_HA, "setting import %s INVALID\n", imp->imp_target_uuid.uuid);
165         imp->imp_invalid = 1;
166         imp->imp_generation++;
167         spin_unlock_irqrestore(&imp->imp_lock, flags);
168
169         ptlrpc_abort_inflight(imp);
170         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
171 }
172
173 /*
174  * This function will invalidate the import, if necessary, then block
175  * for all the RPC completions, and finally notify the obd to
176  * invalidate its state (ie cancel locks, clear pending requests,
177  * etc).
178  */
179 void ptlrpc_invalidate_import(struct obd_import *imp)
180 {
181         struct l_wait_info lwi;
182         int rc;
183
184         if (!imp->imp_invalid)
185                 ptlrpc_deactivate_import(imp);
186
187         LASSERT(imp->imp_invalid);
188
189         /* wait for all requests to error out and call completion callbacks */
190         lwi = LWI_TIMEOUT_INTR(MAX(obd_timeout * HZ, 1), NULL,
191                                NULL, NULL);
192         rc = l_wait_event(imp->imp_recovery_waitq,
193                           (atomic_read(&imp->imp_inflight) == 0),
194                           &lwi);
195
196         if (rc)
197                 CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
198                        imp->imp_target_uuid.uuid, rc,
199                        atomic_read(&imp->imp_inflight));
200
201         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
202 }
203
204 void ptlrpc_activate_import(struct obd_import *imp)
205 {
206         struct obd_device *obd = imp->imp_obd;
207         unsigned long flags;
208
209         spin_lock_irqsave(&imp->imp_lock, flags);
210         imp->imp_invalid = 0;
211         spin_unlock_irqrestore(&imp->imp_lock, flags);
212
213         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
214 }
215
216 void ptlrpc_fail_import(struct obd_import *imp, int generation)
217 {
218         ENTRY;
219
220         LASSERT (!imp->imp_dlm_fake);
221
222         if (ptlrpc_set_import_discon(imp)) {
223                 unsigned long flags;
224
225                 if (!imp->imp_replayable) {
226                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
227                                "auto-deactivating\n",
228                                imp->imp_target_uuid.uuid,
229                                imp->imp_connection->c_remote_uuid.uuid,
230                                imp->imp_obd->obd_name);
231                         ptlrpc_deactivate_import(imp);
232                 }
233
234                 CDEBUG(D_HA, "%s: waking up pinger\n",
235                        imp->imp_target_uuid.uuid);
236
237                 spin_lock_irqsave(&imp->imp_lock, flags);
238                 imp->imp_force_verify = 1;
239                 spin_unlock_irqrestore(&imp->imp_lock, flags);
240
241                 ptlrpc_pinger_wake_up();
242         }
243         EXIT;
244 }
245
246 static int import_select_connection(struct obd_import *imp)
247 {
248         struct obd_import_conn *imp_conn;
249         struct obd_export *dlmexp;
250         ENTRY;
251
252         spin_lock(&imp->imp_lock);
253
254         if (list_empty(&imp->imp_conn_list)) {
255                 CERROR("%s: no connections available\n",
256                         imp->imp_obd->obd_name);
257                 spin_unlock(&imp->imp_lock);
258                 RETURN(-EINVAL);
259         }
260
261         if (imp->imp_conn_current && 
262             !(imp->imp_conn_current->oic_item.next == &imp->imp_conn_list)) {
263                 imp_conn = list_entry(imp->imp_conn_current->oic_item.next,
264                                   struct obd_import_conn, oic_item);
265         } else {
266                 imp_conn = list_entry(imp->imp_conn_list.next,
267                                       struct obd_import_conn, oic_item);
268         }
269
270         /* switch connection, don't mind if it's same as the current one */
271         if (imp->imp_connection)
272                 ptlrpc_put_connection(imp->imp_connection);
273         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
274
275         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
276         LASSERT(dlmexp != NULL);
277         if (dlmexp->exp_connection)
278                 ptlrpc_put_connection(dlmexp->exp_connection);
279         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
280         class_export_put(dlmexp);
281
282         imp->imp_conn_current = imp_conn;
283         CDEBUG(D_HA, "%s: import %p using connection %s\n",
284                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid);
285         spin_unlock(&imp->imp_lock);
286
287         RETURN(0);
288 }
289
290 int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
291 {
292         struct obd_device *obd = imp->imp_obd;
293         int initial_connect = 0;
294         int rc;
295         __u64 committed_before_reconnect = 0;
296         struct ptlrpc_request *request;
297         int size[] = {sizeof(imp->imp_target_uuid),
298                       sizeof(obd->obd_uuid),
299                       sizeof(imp->imp_dlm_handle),
300                       sizeof(imp->imp_connect_data)};
301         char *tmp[] = {imp->imp_target_uuid.uuid,
302                        obd->obd_uuid.uuid,
303                        (char *)&imp->imp_dlm_handle,
304                        (char *)&imp->imp_connect_data};
305         struct ptlrpc_connect_async_args *aa;
306         unsigned long flags;
307
308         spin_lock_irqsave(&imp->imp_lock, flags);
309         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
310                 spin_unlock_irqrestore(&imp->imp_lock, flags);
311                 CERROR("can't connect to a closed import\n");
312                 RETURN(-EINVAL);
313         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
314                 spin_unlock_irqrestore(&imp->imp_lock, flags);
315                 CERROR("already connected\n");
316                 RETURN(0);
317         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
318                 spin_unlock_irqrestore(&imp->imp_lock, flags);
319                 CERROR("already connecting\n");
320                 RETURN(-EALREADY);
321         }
322
323         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
324
325         imp->imp_conn_cnt++;
326         imp->imp_resend_replay = 0;
327
328         if (imp->imp_remote_handle.cookie == 0) {
329                 initial_connect = 1;
330         } else {
331                 committed_before_reconnect = imp->imp_peer_committed_transno;
332         }
333
334         spin_unlock_irqrestore(&imp->imp_lock, flags);
335
336         if (new_uuid) {
337                 struct obd_uuid uuid;
338
339                 obd_str2uuid(&uuid, new_uuid);
340                 rc = import_set_conn_priority(imp, &uuid);
341                 if (rc)
342                         GOTO(out, rc);
343         }
344
345         rc = import_select_connection(imp);
346         if (rc)
347                 GOTO(out, rc);
348
349         request = ptlrpc_prep_req(imp, imp->imp_connect_op, 4, size, tmp);
350         if (!request)
351                 GOTO(out, rc = -ENOMEM);
352
353 #ifndef __KERNEL__
354         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
355 #endif
356
357         request->rq_send_state = LUSTRE_IMP_CONNECTING;
358         size[0] = sizeof(struct obd_connect_data);
359         request->rq_replen = lustre_msg_size(1, size);
360         request->rq_interpret_reply = ptlrpc_connect_interpret;
361
362         LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
363         aa = (struct ptlrpc_connect_async_args *)&request->rq_async_args;
364         memset(aa, 0, sizeof *aa);
365
366         aa->pcaa_peer_committed = committed_before_reconnect;
367         aa->pcaa_initial_connect = initial_connect;
368
369         if (aa->pcaa_initial_connect)
370                 imp->imp_replayable = 1;
371
372         DEBUG_REQ(D_RPCTRACE, request, "(re)connect request");
373         ptlrpcd_add_req(request);
374         rc = 0;
375 out:
376         if (rc != 0) {
377                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
378         }
379
380         RETURN(rc);
381 }
382
383 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
384 {
385         struct obd_import_conn *imp_conn;
386         unsigned long flags;
387         int wake_pinger = 0;
388
389         ENTRY;
390
391         spin_lock_irqsave(&imp->imp_lock, flags);
392         if (list_empty(&imp->imp_conn_list))
393                 GOTO(unlock, 0);
394
395         imp_conn = list_entry(imp->imp_conn_list.prev,
396                               struct obd_import_conn,
397                               oic_item);
398
399         if (imp->imp_conn_current != imp_conn) {
400                 ptlrpc_ping_import_soon(imp);
401                 wake_pinger = 1;
402         }
403
404  unlock:
405         spin_unlock_irqrestore(&imp->imp_lock, flags);
406
407         if (wake_pinger)
408                 ptlrpc_pinger_wake_up();
409
410         EXIT;
411 }
412
413 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
414                                     void * data, int rc)
415 {
416         struct ptlrpc_connect_async_args *aa = data;
417         struct obd_import *imp = request->rq_import;
418         struct lustre_handle old_hdl;
419         unsigned long flags;
420         int msg_flags;
421         ENTRY;
422
423         spin_lock_irqsave(&imp->imp_lock, flags);
424         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
425                 spin_unlock_irqrestore(&imp->imp_lock, flags);
426                 RETURN(0);
427         }
428         spin_unlock_irqrestore(&imp->imp_lock, flags);
429
430         if (rc)
431                 GOTO(out, rc);
432
433         LASSERT(imp->imp_conn_current);
434
435         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
436
437         /* All imports are pingable */
438         imp->imp_pingable = 1;
439         
440         if (aa->pcaa_initial_connect) {
441                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
442                         CDEBUG(D_HA, "connected to replayable target: %s\n",
443                                imp->imp_target_uuid.uuid);
444                         imp->imp_replayable = 1;
445                 } else {
446                         imp->imp_replayable = 0;
447                 }
448                 imp->imp_remote_handle = request->rq_repmsg->handle;
449
450                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
451                 GOTO(finish, rc = 0);
452         }
453
454         /* Determine what recovery state to move the import to. */
455         if (MSG_CONNECT_RECONNECT & msg_flags) {
456                 memset(&old_hdl, 0, sizeof(old_hdl));
457                 if (!memcmp(&old_hdl, &request->rq_repmsg->handle,
458                             sizeof (old_hdl))) {
459                         CERROR("%s@%s didn't like our handle "LPX64
460                                ", failed\n", imp->imp_target_uuid.uuid,
461                                imp->imp_connection->c_remote_uuid.uuid,
462                                imp->imp_dlm_handle.cookie);
463                         GOTO(out, rc = -ENOTCONN);
464                 }
465
466                 if (memcmp(&imp->imp_remote_handle, &request->rq_repmsg->handle,
467                            sizeof(imp->imp_remote_handle))) {
468                         CERROR("%s@%s changed handle from "LPX64" to "LPX64
469                                "; copying, but this may foreshadow disaster\n",
470                                imp->imp_target_uuid.uuid,
471                                imp->imp_connection->c_remote_uuid.uuid,
472                                imp->imp_remote_handle.cookie,
473                                request->rq_repmsg->handle.cookie);
474                         imp->imp_remote_handle = request->rq_repmsg->handle;
475                 } else {
476                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
477                                imp->imp_target_uuid.uuid,
478                                imp->imp_connection->c_remote_uuid.uuid);
479                 }
480
481                 if (imp->imp_invalid) {
482                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
483                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
484                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
485                                imp->imp_obd->obd_name,
486                                imp->imp_target_uuid.uuid);
487                         imp->imp_resend_replay = 1;
488                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
489                 } else {
490                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
491                 }
492         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
493                 LASSERT(imp->imp_replayable);
494                 imp->imp_remote_handle = request->rq_repmsg->handle;
495                 imp->imp_last_replay_transno = 0;
496                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
497         } else {
498                 imp->imp_remote_handle = request->rq_repmsg->handle;
499                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
500         }
501
502         /* Sanity checks for a reconnected import. */
503         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
504                 CERROR("imp_replayable flag does not match server "
505                        "after reconnect. We should LBUG right here.\n");
506         }
507
508         if (request->rq_repmsg->last_committed < aa->pcaa_peer_committed) {
509                 CERROR("%s went back in time (transno "LPD64
510                        " was previously committed, server now claims "LPD64
511                        ")! is shared storage not coherent?\n",
512                        imp->imp_target_uuid.uuid,
513                        aa->pcaa_peer_committed,
514                        request->rq_repmsg->last_committed);
515         }
516
517 finish:
518         rc = ptlrpc_import_recovery_state_machine(imp);
519         if (rc != 0) {
520                 if (rc == -ENOTCONN) {
521                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
522                                "invalidating and reconnecting\n",
523                                imp->imp_target_uuid.uuid,
524                                imp->imp_connection->c_remote_uuid.uuid);
525                         ptlrpc_connect_import(imp, NULL);
526                         RETURN(0);
527                 }
528         } else {
529                 list_del(&imp->imp_conn_current->oic_item);
530                 list_add(&imp->imp_conn_current->oic_item,
531                          &imp->imp_conn_list);
532                 imp->imp_conn_current = NULL;
533         }
534
535  out:
536         if (rc != 0) {
537                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
538                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov) {
539                         ptlrpc_deactivate_import(imp);
540                 }
541
542                 ptlrpc_maybe_ping_import_soon(imp);
543                 
544                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
545                        imp->imp_target_uuid.uuid,
546                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
547         }
548
549         wake_up(&imp->imp_recovery_waitq);
550         RETURN(rc);
551 }
552
553 static int completed_replay_interpret(struct ptlrpc_request *req,
554                                     void * data, int rc)
555 {
556         atomic_dec(&req->rq_import->imp_replay_inflight);
557         if (req->rq_status == 0) {
558                 ptlrpc_import_recovery_state_machine(req->rq_import);
559         } else {
560                 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
561                        "reconnecting\n", 
562                        req->rq_import->imp_obd->obd_name, req->rq_status);
563                 ptlrpc_connect_import(req->rq_import, NULL);
564         }
565
566         RETURN(0);
567 }
568
569 static int signal_completed_replay(struct obd_import *imp)
570 {
571         struct ptlrpc_request *req;
572         ENTRY;
573
574         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
575         atomic_inc(&imp->imp_replay_inflight);
576
577         req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL, NULL);
578         if (!req) {
579                 atomic_dec(&imp->imp_replay_inflight);
580                 RETURN(-ENOMEM);
581         }
582
583         req->rq_replen = lustre_msg_size(0, NULL);
584         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
585         req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
586         req->rq_timeout *= 3;
587         req->rq_interpret_reply = completed_replay_interpret;
588
589         ptlrpcd_add_req(req);
590         RETURN(0);
591 }
592
593 #ifdef __KERNEL__
594 static int ptlrpc_invalidate_import_thread(void *data)
595 {
596         struct obd_import *imp = data;
597         unsigned long flags;
598
599         ENTRY;
600
601         lock_kernel();
602         ptlrpc_daemonize();
603
604         SIGNAL_MASK_LOCK(current, flags);
605         sigfillset(&current->blocked);
606         RECALC_SIGPENDING;
607         SIGNAL_MASK_UNLOCK(current, flags);
608         THREAD_NAME(current->comm, sizeof(current->comm), "ll_imp_inval");
609         unlock_kernel();
610
611         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
612                imp->imp_obd->obd_name, imp->imp_target_uuid.uuid,
613                imp->imp_connection->c_remote_uuid.uuid);
614
615         ptlrpc_invalidate_import(imp);
616
617         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
618         ptlrpc_import_recovery_state_machine(imp);
619
620         RETURN(0);
621 }
622 #endif
623
624 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
625 {
626         int rc = 0;
627         int inflight;
628         char *target_start;
629         int target_len;
630
631         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
632                 deuuidify(imp->imp_target_uuid.uuid, NULL,
633                           &target_start, &target_len);
634                 LCONSOLE_ERROR("This client was evicted by %.*s; in progress "
635                                "operations using this service will fail.\n",
636                                target_len, target_start);
637                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
638                        imp->imp_target_uuid.uuid,
639                        imp->imp_connection->c_remote_uuid.uuid);
640
641 #ifdef __KERNEL__
642                 rc = kernel_thread(ptlrpc_invalidate_import_thread, imp,
643                                    CLONE_VM | CLONE_FILES);
644                 if (rc < 0)
645                         CERROR("error starting invalidate thread: %d\n", rc);
646                 else
647                         rc = 0;
648                 RETURN(rc);
649 #else
650                 ptlrpc_invalidate_import(imp);
651
652                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
653 #endif
654         }
655
656         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
657                 CDEBUG(D_HA, "replay requested by %s\n",
658                        imp->imp_target_uuid.uuid);
659                 rc = ptlrpc_replay_next(imp, &inflight);
660                 if (inflight == 0 &&
661                     atomic_read(&imp->imp_replay_inflight) == 0) {
662                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
663                         rc = ldlm_replay_locks(imp);
664                         if (rc)
665                                 GOTO(out, rc);
666                 }
667                 rc = 0;
668         }
669
670         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
671                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
672                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
673                         rc = signal_completed_replay(imp);
674                         if (rc)
675                                 GOTO(out, rc);
676                 }
677
678         }
679
680         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
681                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
682                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
683                 }
684         }
685
686         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
687                 char nidbuf[PTL_NALFMT_SIZE];
688
689                 CDEBUG(D_HA, "reconnected to %s@%s\n",
690                        imp->imp_target_uuid.uuid,
691                        imp->imp_connection->c_remote_uuid.uuid);
692
693                 rc = ptlrpc_resend(imp);
694                 if (rc)
695                         GOTO(out, rc);
696                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
697                 ptlrpc_activate_import(imp);
698
699                 deuuidify(imp->imp_target_uuid.uuid, NULL,
700                           &target_start, &target_len);
701                 ptlrpc_peernid2str(&imp->imp_connection->c_peer,
702                                    nidbuf);
703
704                 LCONSOLE_INFO("Connection restored to service %.*s using nid "
705                               "%s.\n",
706                               target_len, target_start, nidbuf);
707
708                 CWARN("%s: connection restored to %s@%s\n",
709                       imp->imp_obd->obd_name,
710                       imp->imp_target_uuid.uuid,
711                       imp->imp_connection->c_remote_uuid.uuid);
712         }
713
714         if (imp->imp_state == LUSTRE_IMP_FULL) {
715                 wake_up(&imp->imp_recovery_waitq);
716                 ptlrpc_wake_delayed(imp);
717         }
718
719  out:
720         RETURN(rc);
721 }
722
723 static int back_to_sleep(void *unused)
724 {
725         return 0;
726 }
727
728 int ptlrpc_disconnect_import(struct obd_import *imp)
729 {
730         struct ptlrpc_request *request;
731         int rq_opc;
732         int rc = 0;
733         unsigned long flags;
734         ENTRY;
735
736         switch (imp->imp_connect_op) {
737         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
738         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
739         case MGMT_CONNECT:rq_opc = MGMT_DISCONNECT;break;
740         default:
741                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
742                        imp->imp_target_uuid.uuid, imp->imp_connect_op);
743                 RETURN(-EINVAL);
744         }
745
746
747         if (ptlrpc_import_in_recovery(imp)) {
748                 struct l_wait_info lwi;
749                 lwi = LWI_TIMEOUT_INTR(MAX(obd_timeout * HZ, 1), back_to_sleep,
750                                        NULL, NULL);
751                 rc = l_wait_event(imp->imp_recovery_waitq,
752                                   !ptlrpc_import_in_recovery(imp), &lwi);
753
754         }
755
756         spin_lock_irqsave(&imp->imp_lock, flags);
757         if (imp->imp_state != LUSTRE_IMP_FULL) {
758                 GOTO(out, 0);
759         }
760         spin_unlock_irqrestore(&imp->imp_lock, flags);
761
762         request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
763         if (request) {
764                 /* For non-replayable connections, don't attempt
765                    reconnect if this fails */
766                 if (!imp->imp_replayable) {
767                         request->rq_no_resend = 1;
768                         IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
769                         request->rq_send_state =  LUSTRE_IMP_CONNECTING;
770                 }
771                 request->rq_replen = lustre_msg_size(0, NULL);
772                 rc = ptlrpc_queue_wait(request);
773                 ptlrpc_req_finished(request);
774         }
775
776         spin_lock_irqsave(&imp->imp_lock, flags);
777 out:
778         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
779         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
780         spin_unlock_irqrestore(&imp->imp_lock, flags);
781
782         RETURN(rc);
783 }
784