Whamcloud - gitweb
b8bcf5a18579fee8dbb16183ffe1ffc7dc1594fe
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Mike Shaver <shaver@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 # include <liblustre.h>
26 #endif
27
28 #include <linux/obd_support.h>
29 #include <linux/lustre_ha.h>
30 #include <linux/lustre_net.h>
31 #include <linux/lustre_import.h>
32 #include <linux/lustre_export.h>
33 #include <linux/obd.h>
34 #include <linux/obd_class.h>
35 #include <linux/lustre_sec.h>
36
37 #include "ptlrpc_internal.h"
38
39 struct ptlrpc_connect_async_args {
40          __u64 pcaa_peer_committed;
41         int pcaa_initial_connect;
42 };
43
44 /* A CLOSED import should remain so. */
45 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
46 do {                                                                           \
47         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
48                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
49                       imp, imp->imp_target_uuid.uuid,                          \
50                       ptlrpc_import_state_name(imp->imp_state),                \
51                       ptlrpc_import_state_name(state));                        \
52                imp->imp_state = state;                                         \
53         }                                                                      \
54 } while(0)
55
56 #define IMPORT_SET_STATE(imp, state)                    \
57 do {                                                    \
58         unsigned long flags;                            \
59                                                         \
60         spin_lock_irqsave(&imp->imp_lock, flags);       \
61         IMPORT_SET_STATE_NOLOCK(imp, state);            \
62         spin_unlock_irqrestore(&imp->imp_lock, flags);  \
63 } while(0)
64
65
66 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
67                                     void * data, int rc);
68 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
69
70 /* Only this function is allowed to change the import state when it is
71  * CLOSED. I would rather refcount the import and free it after
72  * disconnection like we do with exports. To do that, the client_obd
73  * will need to save the peer info somewhere other than in the import,
74  * though. */
75 int ptlrpc_init_import(struct obd_import *imp)
76 {
77         unsigned long flags;
78
79         spin_lock_irqsave(&imp->imp_lock, flags);
80
81         imp->imp_generation++;
82         imp->imp_state =  LUSTRE_IMP_NEW;
83
84         spin_unlock_irqrestore(&imp->imp_lock, flags);
85
86         return 0;
87 }
88
89 /* Returns true if import was FULL, false if import was already not
90  * connected.
91  */
92 int ptlrpc_set_import_discon(struct obd_import *imp)
93 {
94         unsigned long flags;
95         int rc = 0;
96
97         spin_lock_irqsave(&imp->imp_lock, flags);
98
99         if (imp->imp_state == LUSTRE_IMP_FULL) {
100                 CWARN("%s: connection lost to %s@%s\n",
101                       imp->imp_obd->obd_name, 
102                       imp->imp_target_uuid.uuid,
103                       imp->imp_connection->c_remote_uuid.uuid);
104                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
105                 spin_unlock_irqrestore(&imp->imp_lock, flags);
106                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
107                 rc = 1;
108         } else {
109                 spin_unlock_irqrestore(&imp->imp_lock, flags);
110                 CDEBUG(D_HA, "%p %s: import already not connected: %s\n",
111                        imp,imp->imp_client->cli_name,
112                        ptlrpc_import_state_name(imp->imp_state));
113         }
114
115         return rc;
116 }
117
118 /*
119  * This acts as a barrier; all existing requests are rejected, and
120  * no new requests will be accepted until the import is valid again.
121  */
122 void ptlrpc_deactivate_import(struct obd_import *imp)
123 {
124         unsigned long flags;
125         ENTRY;
126
127         spin_lock_irqsave(&imp->imp_lock, flags);
128         CDEBUG(D_HA, "setting import %s INVALID\n",
129                imp->imp_target_uuid.uuid);
130         imp->imp_invalid = 1;
131         imp->imp_generation++;
132         spin_unlock_irqrestore(&imp->imp_lock, flags);
133
134         ptlrpc_abort_inflight(imp);
135         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
136 }
137
138 /*
139  * This function will invalidate the import, if necessary, then block
140  * for all the RPC completions, and finally notify the obd to
141  * invalidate its state (ie cancel locks, clear pending requests,
142  * etc).
143  *
144  * in_rpc: true if this is called while processing an rpc, like
145  *    CONNECT. It will allow for one RPC to be inflight while
146  *    waiting for requests to complete. Ugly, yes, but I don't see an
147  *    cleaner way right now.
148  */
149 void ptlrpc_invalidate_import(struct obd_import *imp, int in_rpc)
150 {
151         struct l_wait_info lwi;
152         unsigned long timeout;
153         int inflight = 0;
154         int rc;
155
156         if (!imp->imp_invalid)
157                 ptlrpc_deactivate_import(imp);
158
159         LASSERT(imp->imp_invalid);
160
161         if (in_rpc)
162                 inflight = 1;
163
164         /* wait for all requests to error out and call completion 
165            callbacks */
166         if (imp->imp_server_timeout)
167                 timeout = obd_timeout / 2;
168         else
169                 timeout = obd_timeout;
170         timeout = MAX(timeout * HZ, 1);
171         lwi = LWI_TIMEOUT_INTR(timeout, NULL, NULL, NULL);
172         rc = l_wait_event(imp->imp_recovery_waitq, 
173                           (atomic_read(&imp->imp_inflight) == inflight), 
174                           &lwi);
175
176         if (rc)
177                 CERROR("%s: rc = %d waiting for callback (%d != %d)\n",
178                        imp->imp_target_uuid.uuid, rc,
179                        atomic_read(&imp->imp_inflight), !!in_rpc);
180
181         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
182 }
183
184 void ptlrpc_activate_import(struct obd_import *imp)
185 {
186         struct obd_device *obd = imp->imp_obd;
187         unsigned long flags;
188
189         spin_lock_irqsave(&imp->imp_lock, flags);
190         imp->imp_invalid = 0;
191         spin_unlock_irqrestore(&imp->imp_lock, flags);
192
193         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
194 }
195
196 void ptlrpc_fail_import(struct obd_import *imp, int generation)
197 {
198         ENTRY;
199
200         LASSERT (!imp->imp_dlm_fake);
201
202         if (ptlrpc_set_import_discon(imp)) {
203                 unsigned long flags;
204
205                 if (!imp->imp_replayable) {
206                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
207                                "auto-deactivating\n",
208                                imp->imp_target_uuid.uuid,
209                                imp->imp_connection->c_remote_uuid.uuid,
210                                imp->imp_obd->obd_name);
211                         ptlrpc_deactivate_import(imp);
212                 }
213
214                 CDEBUG(D_HA, "%s: waking up pinger\n",
215                        imp->imp_target_uuid.uuid);
216
217                 spin_lock_irqsave(&imp->imp_lock, flags);
218                 imp->imp_force_verify = 1;
219                 spin_unlock_irqrestore(&imp->imp_lock, flags);
220
221                 ptlrpc_pinger_wake_up();
222         }
223         EXIT;
224 }
225
226 #define ATTEMPT_TOO_SOON(last)  \
227         ((last) && ((long)(jiffies - (last)) <= (long)(obd_timeout * 2 * HZ)))
228
229 static int import_select_connection(struct obd_import *imp)
230 {
231         struct obd_import_conn *imp_conn, *tmp;
232         struct obd_export *dlmexp;
233         int found = 0;
234         ENTRY;
235
236         spin_lock(&imp->imp_lock);
237
238         if (list_empty(&imp->imp_conn_list)) {
239                 CERROR("no available connections on imp %p@%s\n",
240                         imp, imp->imp_obd->obd_name);
241                 spin_unlock(&imp->imp_lock);
242                 RETURN(-EINVAL);
243         }
244
245         list_for_each_entry(imp_conn, &imp->imp_conn_list, oic_item) {
246                 if (!ATTEMPT_TOO_SOON(imp_conn->oic_last_attempt)) {
247                         found = 1;
248                         break;
249                 }
250         }
251
252         /* if not found, simply choose the current one */
253         if (!found) {
254                 CWARN("obd %s imp 0x%p: all connections have been "
255                       "tried recently\n", imp->imp_obd->obd_name, imp);
256                 LASSERT(imp->imp_conn_current);
257                 imp_conn = imp->imp_conn_current;
258         }
259         LASSERT(imp_conn->oic_conn);
260
261         imp_conn->oic_last_attempt = jiffies;
262
263         /* move the items ahead of the selected one to list tail */
264         while (1) {
265                 tmp= list_entry(imp->imp_conn_list.next,
266                                 struct obd_import_conn, oic_item);
267                 if (tmp == imp_conn)
268                         break;
269                 list_del(&tmp->oic_item);
270                 list_add_tail(&tmp->oic_item, &imp->imp_conn_list);
271         }
272
273         /* switch connection if we chose a new one */
274         if (imp->imp_connection != imp_conn->oic_conn) {
275                 if (imp->imp_connection) {
276                         ptlrpcs_sec_invalidate_cache(imp->imp_sec);
277                         ptlrpc_put_connection(imp->imp_connection);
278                 }
279                 imp->imp_connection =
280                         ptlrpc_connection_addref(imp_conn->oic_conn);
281         }
282
283         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
284         LASSERT(dlmexp != NULL);
285         if (dlmexp->exp_connection)
286                 ptlrpc_put_connection(imp->imp_connection);
287         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
288         class_export_put(dlmexp);
289
290         imp->imp_conn_current = imp_conn;
291         CWARN("obd %s imp 0x%p: select conn %s\n",
292                imp->imp_obd->obd_name, imp,
293                imp_conn->oic_uuid.uuid);
294         spin_unlock(&imp->imp_lock);
295
296         RETURN(0);
297 }
298
299
300
301 int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
302 {
303         struct obd_device *obd = imp->imp_obd;
304         int initial_connect = 0;
305         int rc;
306         __u64 committed_before_reconnect = 0;
307         struct ptlrpc_request *request;
308         int size[] = {sizeof(imp->imp_target_uuid),
309                       sizeof(obd->obd_uuid),
310                       sizeof(imp->imp_dlm_handle),
311                       sizeof(unsigned long),
312                       sizeof(__u32) * 2};
313         char *tmp[] = {imp->imp_target_uuid.uuid,
314                        obd->obd_uuid.uuid,
315                        (char *)&imp->imp_dlm_handle,
316                        (char *)&imp->imp_connect_flags, /* XXX: make this portable! */
317                        (char*) &obd->u.cli.cl_nllu};
318         struct ptlrpc_connect_async_args *aa;
319         unsigned long flags;
320
321         spin_lock_irqsave(&imp->imp_lock, flags);
322         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
323                 spin_unlock_irqrestore(&imp->imp_lock, flags);
324                 CERROR("can't connect to a closed import\n");
325                 RETURN(-EINVAL);
326         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
327                 spin_unlock_irqrestore(&imp->imp_lock, flags);
328                 CERROR("already connected\n");
329                 RETURN(0);
330         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
331                 spin_unlock_irqrestore(&imp->imp_lock, flags);
332                 CERROR("already connecting\n");
333                 RETURN(-EALREADY);
334         }
335
336         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
337
338         imp->imp_resend_replay = 0;
339
340         if (imp->imp_remote_handle.cookie == 0) {
341                 initial_connect = 1;
342         } else {
343                 committed_before_reconnect = imp->imp_peer_committed_transno;;
344                 imp->imp_conn_cnt++;
345         }
346
347
348         spin_unlock_irqrestore(&imp->imp_lock, flags);
349
350         if (new_uuid) {
351                 struct obd_uuid uuid;
352
353                 obd_str2uuid(&uuid, new_uuid);
354
355                 rc = import_set_conn_priority(imp, &uuid);
356                 if (rc)
357                         GOTO(out, rc);
358         }
359         rc = import_select_connection(imp);
360         if (rc)
361                 GOTO(out, rc);
362
363         LASSERT(imp->imp_sec);
364
365         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION,
366                                   imp->imp_connect_op, 5, size, tmp);
367         if (!request)
368                 GOTO(out, rc = -ENOMEM);
369
370 #ifndef __KERNEL__
371         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
372 #endif
373         if (obd->u.cli.cl_async) {
374                 lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_ASYNC);
375         }
376
377         request->rq_send_state = LUSTRE_IMP_CONNECTING;
378         request->rq_replen = lustre_msg_size(0, NULL);
379         request->rq_interpret_reply = ptlrpc_connect_interpret;
380
381         LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
382         aa = (struct ptlrpc_connect_async_args *)&request->rq_async_args;
383         memset(aa, 0, sizeof *aa);
384
385         aa->pcaa_peer_committed = committed_before_reconnect;
386         aa->pcaa_initial_connect = initial_connect;
387
388         if (aa->pcaa_initial_connect) {
389                 lustre_msg_add_op_flags(request->rq_reqmsg, 
390                                         MSG_CONNECT_INITIAL);
391                 imp->imp_replayable = 1; 
392         }
393
394         ptlrpcd_add_req(request);
395         rc = 0;
396         imp->imp_connect_start = jiffies;
397 out:
398         if (rc != 0) {
399                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
400         }
401
402         RETURN(rc);
403 }
404
405 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
406                                     void *data, int rc)
407 {
408         struct ptlrpc_connect_async_args *aa = data;
409         struct obd_import *imp = request->rq_import;
410         struct lustre_handle old_hdl;
411         unsigned long flags;
412         int msg_flags;
413         ENTRY;
414
415         spin_lock_irqsave(&imp->imp_lock, flags);
416         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
417                 spin_unlock_irqrestore(&imp->imp_lock, flags);
418                 RETURN(0);
419         }
420         spin_unlock_irqrestore(&imp->imp_lock, flags);
421
422         if (rc)
423                 GOTO(out, rc);
424         LASSERT(imp->imp_conn_current);
425         imp->imp_conn_current->oic_last_attempt = 0;
426
427         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
428
429         if (aa->pcaa_initial_connect) {
430                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
431                         CDEBUG(D_HA, "connected to replayable target: %s\n",
432                                imp->imp_target_uuid.uuid);
433                         imp->imp_pingable = imp->imp_replayable = 1;
434                 } else {
435                         imp->imp_replayable = 0;
436                 }
437                 LASSERTF(imp->imp_conn_cnt < request->rq_repmsg->conn_cnt,
438                          "imp conn_cnt %d req conn_cnt %d", 
439                          imp->imp_conn_cnt, request->rq_repmsg->conn_cnt);
440                 imp->imp_conn_cnt = request->rq_repmsg->conn_cnt;
441                 imp->imp_remote_handle = request->rq_repmsg->handle;
442                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
443                 GOTO(finish, rc = 0);
444         }
445
446         /* Determine what recovery state to move the import to. */
447         if (MSG_CONNECT_RECONNECT & msg_flags) {
448                 memset(&old_hdl, 0, sizeof(old_hdl));
449                 if (!memcmp(&old_hdl, &request->rq_repmsg->handle,
450                             sizeof (old_hdl))) {
451                         CERROR("%s@%s didn't like our handle "LPX64
452                                ", failed\n", imp->imp_target_uuid.uuid,
453                                imp->imp_connection->c_remote_uuid.uuid,
454                                imp->imp_dlm_handle.cookie);
455                         GOTO(out, rc = -ENOTCONN);
456                 }
457
458                 if (memcmp(&imp->imp_remote_handle, &request->rq_repmsg->handle,
459                            sizeof(imp->imp_remote_handle))) {
460                         CERROR("%s@%s changed handle from "LPX64" to "LPX64
461                                "; copying, but this may foreshadow disaster\n",
462                                imp->imp_target_uuid.uuid,
463                                imp->imp_connection->c_remote_uuid.uuid,
464                                imp->imp_remote_handle.cookie,
465                                request->rq_repmsg->handle.cookie);
466                         imp->imp_remote_handle = request->rq_repmsg->handle;
467                 } else {
468                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
469                                imp->imp_target_uuid.uuid,
470                                imp->imp_connection->c_remote_uuid.uuid);
471                 }
472
473                 if (imp->imp_invalid) {
474                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
475                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
476                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
477                                imp->imp_obd->obd_name, 
478                                imp->imp_target_uuid.uuid);
479                         imp->imp_resend_replay = 1;
480                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
481                 } else {
482                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
483                 }
484         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
485                 LASSERT(imp->imp_replayable);
486                 imp->imp_remote_handle = request->rq_repmsg->handle;
487                 imp->imp_last_replay_transno = 0;
488                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
489         } else {
490                 CDEBUG(D_HA, "oops! we get evicted from %s\n", imp->imp_target_uuid.uuid);
491                 imp->imp_remote_handle = request->rq_repmsg->handle;
492                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
493         }
494
495         /* Sanity checks for a reconnected import. */
496         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
497                 CERROR("imp_replayable flag does not match server "
498                        "after reconnect. We should LBUG right here.\n");
499         }
500
501         if (request->rq_repmsg->last_committed < aa->pcaa_peer_committed) {
502                 CERROR("%s went back in time (transno "LPD64
503                        " was previously committed, server now claims "LPD64
504                        ")! is shared storage not coherent?\n",
505                        imp->imp_target_uuid.uuid,
506                        aa->pcaa_peer_committed,
507                        request->rq_repmsg->last_committed);
508         }
509
510 finish:
511         rc = ptlrpc_import_recovery_state_machine(imp);
512         if (rc != 0) {
513                 if (rc == -ENOTCONN) {
514                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
515                                "invalidating and reconnecting\n",
516                                imp->imp_target_uuid.uuid,
517                                imp->imp_connection->c_remote_uuid.uuid);
518                         ptlrpc_connect_import(imp, NULL);
519                         RETURN(0);
520                 }
521         }
522  out:
523         if (rc != 0) {
524                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
525                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov)
526                         ptlrpc_deactivate_import(imp);
527                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
528                        imp->imp_target_uuid.uuid,
529                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
530         }
531
532         wake_up(&imp->imp_recovery_waitq);
533         RETURN(rc);
534 }
535
536 static int completed_replay_interpret(struct ptlrpc_request *req,
537                                       void *data, int rc)
538 {
539         atomic_dec(&req->rq_import->imp_replay_inflight);
540         if (req->rq_status == 0) {
541                 ptlrpc_import_recovery_state_machine(req->rq_import);
542         } else {
543                 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
544                        "reconnecting\n", 
545                        req->rq_import->imp_obd->obd_name, req->rq_status);
546                 ptlrpc_connect_import(req->rq_import, NULL);
547         }
548
549         RETURN(0);
550 }
551
552 static int signal_completed_replay(struct obd_import *imp)
553  {
554         struct ptlrpc_request *req;
555         ENTRY;
556
557         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
558         atomic_inc(&imp->imp_replay_inflight);
559
560         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 0, NULL, NULL);
561         if (!req) {
562                 atomic_dec(&imp->imp_replay_inflight);
563                 RETURN(-ENOMEM);
564         }
565
566         req->rq_replen = lustre_msg_size(0, NULL);
567         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
568         req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
569         req->rq_timeout *= 3;
570         req->rq_interpret_reply = completed_replay_interpret;
571
572         ptlrpcd_add_req(req);
573         RETURN(0);
574 }
575
576 #ifdef __KERNEL__
577 static int ptlrpc_invalidate_import_thread(void *data)
578 {
579         struct obd_import *imp = data;
580         unsigned long flags;
581
582         ENTRY;
583
584         lock_kernel();
585         ptlrpc_daemonize();
586
587         SIGNAL_MASK_LOCK(current, flags);
588         sigfillset(&current->blocked);
589         RECALC_SIGPENDING;
590         SIGNAL_MASK_UNLOCK(current, flags);
591         THREAD_NAME(current->comm, sizeof(current->comm), "ll_imp_inval");
592         unlock_kernel();
593
594         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
595                imp->imp_obd->obd_name, imp->imp_target_uuid.uuid,
596                imp->imp_connection->c_remote_uuid.uuid);
597
598         ptlrpc_invalidate_import(imp, 0);
599         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
600
601         ptlrpc_import_recovery_state_machine(imp);
602
603         RETURN(0);
604 }
605 #endif
606
607 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
608 {
609         int rc = 0;
610         int inflight;
611
612         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
613                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
614                        imp->imp_target_uuid.uuid,
615                        imp->imp_connection->c_remote_uuid.uuid);
616
617 #ifdef __KERNEL__
618                 rc = kernel_thread(ptlrpc_invalidate_import_thread, imp,
619                                    CLONE_VM | CLONE_FILES);
620                 if (rc < 0)
621                         CERROR("error starting invalidate thread: %d\n", rc);
622                 RETURN(rc);
623 #else
624                 ptlrpc_invalidate_import(imp, 1);
625
626                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
627 #endif
628         }
629
630         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
631                 CDEBUG(D_HA, "replay requested by %s\n",
632                        imp->imp_target_uuid.uuid);
633                 rc = ptlrpc_replay_next(imp, &inflight);
634                 if (inflight == 0 &&
635                     atomic_read(&imp->imp_replay_inflight) == 0) {
636                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
637                         rc = ldlm_replay_locks(imp);
638                         if (rc)
639                                 GOTO(out, rc);
640                 }
641                 rc = 0;
642         }
643
644         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
645                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
646                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
647                         rc = signal_completed_replay(imp);
648                         if (rc)
649                                 GOTO(out, rc);
650                 }
651
652         }
653
654         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
655                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
656                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
657                 }
658         }
659
660         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
661                 CDEBUG(D_HA, "reconnected to %s@%s\n",
662                        imp->imp_target_uuid.uuid,
663                        imp->imp_connection->c_remote_uuid.uuid);
664
665                 rc = ptlrpc_resend(imp);
666                 if (rc)
667                         GOTO(out, rc);
668                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
669                 ptlrpc_activate_import(imp);
670                 CWARN("%s: connection restored to %s@%s\n",
671                       imp->imp_obd->obd_name, 
672                       imp->imp_target_uuid.uuid,
673                       imp->imp_connection->c_remote_uuid.uuid);
674         }
675
676         if (imp->imp_state == LUSTRE_IMP_FULL) {
677                 wake_up(&imp->imp_recovery_waitq);
678                 ptlrpc_wake_delayed(imp);
679         }
680
681  out:
682         RETURN(rc);
683 }
684
685 static int back_to_sleep(void *unused)
686 {
687         return 0;
688 }
689
690 int ptlrpc_disconnect_import(struct obd_import *imp)
691 {
692         struct ptlrpc_request *request;
693         int rq_opc;
694         int rc = 0;
695         unsigned long flags;
696         ENTRY;
697
698         switch (imp->imp_connect_op) {
699         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
700         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
701         case MGMT_CONNECT: rq_opc = MGMT_DISCONNECT; break;
702         default:
703                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
704                        imp->imp_target_uuid.uuid, imp->imp_connect_op);
705                 RETURN(-EINVAL);
706         }
707
708
709         if (ptlrpc_import_in_recovery(imp)) {
710                 struct l_wait_info lwi;
711                 unsigned long timeout;
712                 if (imp->imp_server_timeout)
713                         timeout = obd_timeout / 2;
714                 else
715                         timeout = obd_timeout;
716                 timeout = MAX(timeout * HZ, 1);
717                 lwi = LWI_TIMEOUT_INTR(obd_timeout, back_to_sleep, NULL, NULL);
718                 rc = l_wait_event(imp->imp_recovery_waitq, 
719                                   !ptlrpc_import_in_recovery(imp), &lwi);
720
721         }
722
723         spin_lock_irqsave(&imp->imp_lock, flags);
724         if (imp->imp_state != LUSTRE_IMP_FULL) {
725                 GOTO(out, 0);
726         }
727         spin_unlock_irqrestore(&imp->imp_lock, flags);
728
729         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc,
730                                   0, NULL, NULL);
731         if (request) {
732                 /* For non-replayable connections, don't attempt
733                    reconnect if this fails */
734                 if (!imp->imp_replayable) {
735                         request->rq_no_resend = 1;
736                         IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
737                         request->rq_send_state =  LUSTRE_IMP_CONNECTING;
738                 }
739                 request->rq_replen = lustre_msg_size(0, NULL);
740                 rc = ptlrpc_queue_wait(request);
741                 ptlrpc_req_finished(request);
742         }
743
744         spin_lock_irqsave(&imp->imp_lock, flags);
745 out:
746         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
747         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
748         imp->imp_conn_cnt = 0;
749         spin_unlock_irqrestore(&imp->imp_lock, flags);
750
751         RETURN(rc);
752 }
753