Whamcloud - gitweb
Branch b1_4_newconfig2
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Mike Shaver <shaver@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28
29 #ifdef __KERNEL__
30 # include <linux/config.h>
31 # include <linux/module.h>
32 # include <linux/kmod.h>
33 #else
34 # include <liblustre.h>
35 #endif
36
37 #include <linux/obd_support.h>
38 #include <linux/lustre_ha.h>
39 #include <linux/lustre_net.h>
40 #include <linux/lustre_import.h>
41 #include <linux/lustre_export.h>
42 #include <linux/obd.h>
43 #include <linux/obd_class.h>
44
45 #include "ptlrpc_internal.h"
46
47 struct ptlrpc_connect_async_args {
48          __u64 pcaa_peer_committed;
49         int pcaa_initial_connect;
50 };
51
52 /* A CLOSED import should remain so. */
53 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
54 do {                                                                           \
55         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
56                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
57                       imp, imp->imp_target_uuid.uuid,                          \
58                       ptlrpc_import_state_name(imp->imp_state),                \
59                       ptlrpc_import_state_name(state));                        \
60                imp->imp_state = state;                                         \
61         }                                                                      \
62 } while(0)
63
64 #define IMPORT_SET_STATE(imp, state)                    \
65 do {                                                    \
66         unsigned long flags;                            \
67                                                         \
68         spin_lock_irqsave(&imp->imp_lock, flags);       \
69         IMPORT_SET_STATE_NOLOCK(imp, state);            \
70         spin_unlock_irqrestore(&imp->imp_lock, flags);  \
71 } while(0)
72
73
74 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
75                                     void * data, int rc);
76 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
77
78 /* Only this function is allowed to change the import state when it is
79  * CLOSED. I would rather refcount the import and free it after
80  * disconnection like we do with exports. To do that, the client_obd
81  * will need to save the peer info somewhere other than in the import,
82  * though. */
83 int ptlrpc_init_import(struct obd_import *imp)
84 {
85         unsigned long flags;
86
87         spin_lock_irqsave(&imp->imp_lock, flags);
88
89         imp->imp_generation++;
90         imp->imp_state =  LUSTRE_IMP_NEW;
91
92         spin_unlock_irqrestore(&imp->imp_lock, flags);
93
94         return 0;
95 }
96 EXPORT_SYMBOL(ptlrpc_init_import);
97
98 #define UUID_STR "_UUID"
99 static void deuuidify(char *uuid, const char *prefix, char **uuid_start, int *uuid_len)
100 {
101         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
102                 ? uuid : uuid + strlen(prefix);
103
104         *uuid_len = strlen(*uuid_start);
105
106         if (*uuid_len < strlen(UUID_STR))
107                 return;
108         
109         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
110                     UUID_STR, strlen(UUID_STR)))
111                 *uuid_len -= strlen(UUID_STR);
112 }
113
114 /* Returns true if import was FULL, false if import was already not
115  * connected.
116  */
117 int ptlrpc_set_import_discon(struct obd_import *imp)
118 {
119         unsigned long flags;
120         int rc = 0;
121
122         spin_lock_irqsave(&imp->imp_lock, flags);
123
124         if (imp->imp_state == LUSTRE_IMP_FULL) {
125                 char *target_start;
126                 int   target_len;
127
128                 deuuidify(imp->imp_target_uuid.uuid, NULL,
129                           &target_start, &target_len);
130
131                 LCONSOLE_ERROR("Connection to service %.*s via nid %s was "
132                                "lost; in progress operations using this "
133                                "service will %s.\n",
134                                target_len, target_start,
135                                libcfs_nid2str(imp->imp_connection->c_peer.nid),
136                                imp->imp_replayable 
137                                ? "wait for recovery to complete"
138                                : "fail");
139
140                 CWARN("%s: connection lost to %s@%s\n",
141                       imp->imp_obd->obd_name,
142                       imp->imp_target_uuid.uuid,
143                       imp->imp_connection->c_remote_uuid.uuid);
144                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
145                 spin_unlock_irqrestore(&imp->imp_lock, flags);
146                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
147                 rc = 1;
148         } else {
149                 spin_unlock_irqrestore(&imp->imp_lock, flags);
150                 CDEBUG(D_HA, "%p %s: import already not connected: %s\n",
151                        imp,imp->imp_client->cli_name,
152                        ptlrpc_import_state_name(imp->imp_state));
153         }
154
155         return rc;
156 }
157
158 /*
159  * This acts as a barrier; all existing requests are rejected, and
160  * no new requests will be accepted until the import is valid again.
161  */
162 void ptlrpc_deactivate_import(struct obd_import *imp)
163 {
164         unsigned long flags;
165         ENTRY;
166
167         spin_lock_irqsave(&imp->imp_lock, flags);
168         CDEBUG(D_HA, "setting import %s INVALID\n", imp->imp_target_uuid.uuid);
169         imp->imp_invalid = 1;
170         imp->imp_generation++;
171         spin_unlock_irqrestore(&imp->imp_lock, flags);
172
173         ptlrpc_abort_inflight(imp);
174         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
175 }
176
177 /*
178  * This function will invalidate the import, if necessary, then block
179  * for all the RPC completions, and finally notify the obd to
180  * invalidate its state (ie cancel locks, clear pending requests,
181  * etc).
182  */
183 void ptlrpc_invalidate_import(struct obd_import *imp)
184 {
185         struct l_wait_info lwi;
186         int rc;
187
188         if (!imp->imp_invalid)
189                 ptlrpc_deactivate_import(imp);
190
191         LASSERT(imp->imp_invalid);
192
193         /* wait for all requests to error out and call completion callbacks */
194         lwi = LWI_TIMEOUT_INTR(MAX(obd_timeout * HZ, 1), NULL,
195                                NULL, NULL);
196         rc = l_wait_event(imp->imp_recovery_waitq,
197                           (atomic_read(&imp->imp_inflight) == 0),
198                           &lwi);
199
200         if (rc)
201                 CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
202                        imp->imp_target_uuid.uuid, rc,
203                        atomic_read(&imp->imp_inflight));
204
205         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
206 }
207
208 void ptlrpc_activate_import(struct obd_import *imp)
209 {
210         struct obd_device *obd = imp->imp_obd;
211         unsigned long flags;
212
213         spin_lock_irqsave(&imp->imp_lock, flags);
214         imp->imp_invalid = 0;
215         spin_unlock_irqrestore(&imp->imp_lock, flags);
216
217         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
218 }
219
220 void ptlrpc_fail_import(struct obd_import *imp, int generation)
221 {
222         ENTRY;
223
224         LASSERT (!imp->imp_dlm_fake);
225
226         if (ptlrpc_set_import_discon(imp)) {
227                 unsigned long flags;
228
229                 if (!imp->imp_replayable) {
230                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
231                                "auto-deactivating\n",
232                                imp->imp_target_uuid.uuid,
233                                imp->imp_connection->c_remote_uuid.uuid,
234                                imp->imp_obd->obd_name);
235                         ptlrpc_deactivate_import(imp);
236                 }
237
238                 CDEBUG(D_HA, "%s: waking up pinger\n",
239                        imp->imp_target_uuid.uuid);
240
241                 spin_lock_irqsave(&imp->imp_lock, flags);
242                 imp->imp_force_verify = 1;
243                 spin_unlock_irqrestore(&imp->imp_lock, flags);
244
245                 ptlrpc_pinger_wake_up();
246         }
247         EXIT;
248 }
249
250 static int import_select_connection(struct obd_import *imp)
251 {
252         struct obd_import_conn *imp_conn;
253         struct obd_export *dlmexp;
254         ENTRY;
255
256         spin_lock(&imp->imp_lock);
257
258         if (list_empty(&imp->imp_conn_list)) {
259                 CERROR("%s: no connections available\n",
260                         imp->imp_obd->obd_name);
261                 spin_unlock(&imp->imp_lock);
262                 RETURN(-EINVAL);
263         }
264
265         if (imp->imp_conn_current && 
266             !(imp->imp_conn_current->oic_item.next == &imp->imp_conn_list)) {
267                 imp_conn = list_entry(imp->imp_conn_current->oic_item.next,
268                                   struct obd_import_conn, oic_item);
269         } else {
270                 imp_conn = list_entry(imp->imp_conn_list.next,
271                                       struct obd_import_conn, oic_item);
272         }
273
274         /* switch connection, don't mind if it's same as the current one */
275         if (imp->imp_connection)
276                 ptlrpc_put_connection(imp->imp_connection);
277         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
278
279         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
280         LASSERT(dlmexp != NULL);
281         if (dlmexp->exp_connection)
282                 ptlrpc_put_connection(dlmexp->exp_connection);
283         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
284         class_export_put(dlmexp);
285
286         imp->imp_conn_current = imp_conn;
287         CDEBUG(D_HA, "%s: import %p using connection %s\n",
288                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid);
289         spin_unlock(&imp->imp_lock);
290
291         RETURN(0);
292 }
293
294 int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
295 {
296         struct obd_device *obd = imp->imp_obd;
297         int initial_connect = 0;
298         int rc;
299         __u64 committed_before_reconnect = 0;
300         struct ptlrpc_request *request;
301         int size[] = {sizeof(imp->imp_target_uuid),
302                       sizeof(obd->obd_uuid),
303                       sizeof(imp->imp_dlm_handle),
304                       sizeof(imp->imp_connect_data)};
305         char *tmp[] = {imp->imp_target_uuid.uuid,
306                        obd->obd_uuid.uuid,
307                        (char *)&imp->imp_dlm_handle,
308                        (char *)&imp->imp_connect_data};
309         struct ptlrpc_connect_async_args *aa;
310         unsigned long flags;
311
312         spin_lock_irqsave(&imp->imp_lock, flags);
313         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
314                 spin_unlock_irqrestore(&imp->imp_lock, flags);
315                 CERROR("can't connect to a closed import\n");
316                 RETURN(-EINVAL);
317         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
318                 spin_unlock_irqrestore(&imp->imp_lock, flags);
319                 CERROR("already connected\n");
320                 RETURN(0);
321         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
322                 spin_unlock_irqrestore(&imp->imp_lock, flags);
323                 CERROR("already connecting\n");
324                 RETURN(-EALREADY);
325         }
326
327         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
328
329         imp->imp_conn_cnt++;
330         imp->imp_resend_replay = 0;
331
332         if (imp->imp_remote_handle.cookie == 0) {
333                 initial_connect = 1;
334         } else {
335                 committed_before_reconnect = imp->imp_peer_committed_transno;
336         }
337
338         spin_unlock_irqrestore(&imp->imp_lock, flags);
339
340         if (new_uuid) {
341                 struct obd_uuid uuid;
342
343                 obd_str2uuid(&uuid, new_uuid);
344                 rc = import_set_conn_priority(imp, &uuid);
345                 if (rc)
346                         GOTO(out, rc);
347         }
348
349         rc = import_select_connection(imp);
350         if (rc)
351                 GOTO(out, rc);
352
353         request = ptlrpc_prep_req(imp, imp->imp_connect_op, 4, size, tmp);
354         if (!request)
355                 GOTO(out, rc = -ENOMEM);
356
357 #ifndef __KERNEL__
358         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
359 #endif
360
361         request->rq_send_state = LUSTRE_IMP_CONNECTING;
362         size[0] = sizeof(struct obd_connect_data);
363         request->rq_replen = lustre_msg_size(1, size);
364         request->rq_interpret_reply = ptlrpc_connect_interpret;
365
366         LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
367         aa = (struct ptlrpc_connect_async_args *)&request->rq_async_args;
368         memset(aa, 0, sizeof *aa);
369
370         aa->pcaa_peer_committed = committed_before_reconnect;
371         aa->pcaa_initial_connect = initial_connect;
372
373         if (aa->pcaa_initial_connect)
374                 imp->imp_replayable = 1;
375
376         DEBUG_REQ(D_RPCTRACE, request, "(re)connect request");
377         ptlrpcd_add_req(request);
378         rc = 0;
379 out:
380         if (rc != 0) {
381                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
382         }
383
384         RETURN(rc);
385 }
386 EXPORT_SYMBOL(ptlrpc_connect_import);
387
388 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
389 {
390         struct obd_import_conn *imp_conn;
391         unsigned long flags;
392         int wake_pinger = 0;
393
394         ENTRY;
395
396         spin_lock_irqsave(&imp->imp_lock, flags);
397         if (list_empty(&imp->imp_conn_list))
398                 GOTO(unlock, 0);
399
400         imp_conn = list_entry(imp->imp_conn_list.prev,
401                               struct obd_import_conn,
402                               oic_item);
403
404         if (imp->imp_conn_current != imp_conn) {
405                 ptlrpc_ping_import_soon(imp);
406                 wake_pinger = 1;
407         }
408
409  unlock:
410         spin_unlock_irqrestore(&imp->imp_lock, flags);
411
412         if (wake_pinger)
413                 ptlrpc_pinger_wake_up();
414
415         EXIT;
416 }
417
418 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
419                                     void * data, int rc)
420 {
421         struct ptlrpc_connect_async_args *aa = data;
422         struct obd_import *imp = request->rq_import;
423         struct lustre_handle old_hdl;
424         unsigned long flags;
425         int msg_flags;
426         ENTRY;
427
428         spin_lock_irqsave(&imp->imp_lock, flags);
429         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
430                 spin_unlock_irqrestore(&imp->imp_lock, flags);
431                 RETURN(0);
432         }
433         spin_unlock_irqrestore(&imp->imp_lock, flags);
434
435         if (rc)
436                 GOTO(out, rc);
437
438         LASSERT(imp->imp_conn_current);
439
440         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
441
442         /* All imports are pingable */
443         imp->imp_pingable = 1;
444         
445         if (aa->pcaa_initial_connect) {
446                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
447                         CDEBUG(D_HA, "connected to replayable target: %s\n",
448                                imp->imp_target_uuid.uuid);
449                         imp->imp_replayable = 1;
450                 } else {
451                         imp->imp_replayable = 0;
452                 }
453                 imp->imp_remote_handle = request->rq_repmsg->handle;
454
455                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
456                 GOTO(finish, rc = 0);
457         }
458
459         /* Determine what recovery state to move the import to. */
460         if (MSG_CONNECT_RECONNECT & msg_flags) {
461                 memset(&old_hdl, 0, sizeof(old_hdl));
462                 if (!memcmp(&old_hdl, &request->rq_repmsg->handle,
463                             sizeof (old_hdl))) {
464                         CERROR("%s@%s didn't like our handle "LPX64
465                                ", failed\n", imp->imp_target_uuid.uuid,
466                                imp->imp_connection->c_remote_uuid.uuid,
467                                imp->imp_dlm_handle.cookie);
468                         GOTO(out, rc = -ENOTCONN);
469                 }
470
471                 if (memcmp(&imp->imp_remote_handle, &request->rq_repmsg->handle,
472                            sizeof(imp->imp_remote_handle))) {
473                         CERROR("%s@%s changed handle from "LPX64" to "LPX64
474                                "; copying, but this may foreshadow disaster\n",
475                                imp->imp_target_uuid.uuid,
476                                imp->imp_connection->c_remote_uuid.uuid,
477                                imp->imp_remote_handle.cookie,
478                                request->rq_repmsg->handle.cookie);
479                         imp->imp_remote_handle = request->rq_repmsg->handle;
480                 } else {
481                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
482                                imp->imp_target_uuid.uuid,
483                                imp->imp_connection->c_remote_uuid.uuid);
484                 }
485
486                 if (imp->imp_invalid) {
487                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
488                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
489                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
490                                imp->imp_obd->obd_name,
491                                imp->imp_target_uuid.uuid);
492                         imp->imp_resend_replay = 1;
493                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
494                 } else {
495                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
496                 }
497         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
498                 LASSERT(imp->imp_replayable);
499                 imp->imp_remote_handle = request->rq_repmsg->handle;
500                 imp->imp_last_replay_transno = 0;
501                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
502         } else {
503                 imp->imp_remote_handle = request->rq_repmsg->handle;
504                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
505         }
506
507         /* Sanity checks for a reconnected import. */
508         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
509                 CERROR("imp_replayable flag does not match server "
510                        "after reconnect. We should LBUG right here.\n");
511         }
512
513         if (request->rq_repmsg->last_committed < aa->pcaa_peer_committed) {
514                 CERROR("%s went back in time (transno "LPD64
515                        " was previously committed, server now claims "LPD64
516                        ")! is shared storage not coherent?\n",
517                        imp->imp_target_uuid.uuid,
518                        aa->pcaa_peer_committed,
519                        request->rq_repmsg->last_committed);
520         }
521
522 finish:
523         rc = ptlrpc_import_recovery_state_machine(imp);
524         if (rc != 0) {
525                 if (rc == -ENOTCONN) {
526                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
527                                "invalidating and reconnecting\n",
528                                imp->imp_target_uuid.uuid,
529                                imp->imp_connection->c_remote_uuid.uuid);
530                         ptlrpc_connect_import(imp, NULL);
531                         RETURN(0);
532                 }
533         } else {
534                 list_del(&imp->imp_conn_current->oic_item);
535                 list_add(&imp->imp_conn_current->oic_item,
536                          &imp->imp_conn_list);
537                 imp->imp_conn_current = NULL;
538         }
539
540  out:
541         if (rc != 0) {
542                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
543                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov) {
544                         ptlrpc_deactivate_import(imp);
545                 }
546
547                 ptlrpc_maybe_ping_import_soon(imp);
548                 
549                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
550                        imp->imp_target_uuid.uuid,
551                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
552         }
553
554         wake_up(&imp->imp_recovery_waitq);
555         RETURN(rc);
556 }
557
558 static int completed_replay_interpret(struct ptlrpc_request *req,
559                                     void * data, int rc)
560 {
561         atomic_dec(&req->rq_import->imp_replay_inflight);
562         if (req->rq_status == 0) {
563                 ptlrpc_import_recovery_state_machine(req->rq_import);
564         } else {
565                 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
566                        "reconnecting\n", 
567                        req->rq_import->imp_obd->obd_name, req->rq_status);
568                 ptlrpc_connect_import(req->rq_import, NULL);
569         }
570
571         RETURN(0);
572 }
573
574 static int signal_completed_replay(struct obd_import *imp)
575 {
576         struct ptlrpc_request *req;
577         ENTRY;
578
579         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
580         atomic_inc(&imp->imp_replay_inflight);
581
582         req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL, NULL);
583         if (!req) {
584                 atomic_dec(&imp->imp_replay_inflight);
585                 RETURN(-ENOMEM);
586         }
587
588         req->rq_replen = lustre_msg_size(0, NULL);
589         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
590         req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
591         req->rq_timeout *= 3;
592         req->rq_interpret_reply = completed_replay_interpret;
593
594         ptlrpcd_add_req(req);
595         RETURN(0);
596 }
597
598 #ifdef __KERNEL__
599 static int ptlrpc_invalidate_import_thread(void *data)
600 {
601         struct obd_import *imp = data;
602         unsigned long flags;
603
604         ENTRY;
605
606         lock_kernel();
607         ptlrpc_daemonize();
608
609         SIGNAL_MASK_LOCK(current, flags);
610         sigfillset(&current->blocked);
611         RECALC_SIGPENDING;
612         SIGNAL_MASK_UNLOCK(current, flags);
613         THREAD_NAME(current->comm, sizeof(current->comm), "ll_imp_inval");
614         unlock_kernel();
615
616         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
617                imp->imp_obd->obd_name, imp->imp_target_uuid.uuid,
618                imp->imp_connection->c_remote_uuid.uuid);
619
620         ptlrpc_invalidate_import(imp);
621
622         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
623         ptlrpc_import_recovery_state_machine(imp);
624
625         RETURN(0);
626 }
627 #endif
628
629 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
630 {
631         int rc = 0;
632         int inflight;
633         char *target_start;
634         int target_len;
635
636         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
637                 deuuidify(imp->imp_target_uuid.uuid, NULL,
638                           &target_start, &target_len);
639                 LCONSOLE_ERROR("This client was evicted by %.*s; in progress "
640                                "operations using this service will fail.\n",
641                                target_len, target_start);
642                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
643                        imp->imp_target_uuid.uuid,
644                        imp->imp_connection->c_remote_uuid.uuid);
645
646 #ifdef __KERNEL__
647                 rc = kernel_thread(ptlrpc_invalidate_import_thread, imp,
648                                    CLONE_VM | CLONE_FILES);
649                 if (rc < 0)
650                         CERROR("error starting invalidate thread: %d\n", rc);
651                 else
652                         rc = 0;
653                 RETURN(rc);
654 #else
655                 ptlrpc_invalidate_import(imp);
656
657                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
658 #endif
659         }
660
661         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
662                 CDEBUG(D_HA, "replay requested by %s\n",
663                        imp->imp_target_uuid.uuid);
664                 rc = ptlrpc_replay_next(imp, &inflight);
665                 if (inflight == 0 &&
666                     atomic_read(&imp->imp_replay_inflight) == 0) {
667                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
668                         rc = ldlm_replay_locks(imp);
669                         if (rc)
670                                 GOTO(out, rc);
671                 }
672                 rc = 0;
673         }
674
675         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
676                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
677                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
678                         rc = signal_completed_replay(imp);
679                         if (rc)
680                                 GOTO(out, rc);
681                 }
682
683         }
684
685         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
686                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
687                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
688                 }
689         }
690
691         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
692                 char   *nidstr;
693
694                 CDEBUG(D_HA, "reconnected to %s@%s\n",
695                        imp->imp_target_uuid.uuid,
696                        imp->imp_connection->c_remote_uuid.uuid);
697
698                 rc = ptlrpc_resend(imp);
699                 if (rc)
700                         GOTO(out, rc);
701                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
702                 ptlrpc_activate_import(imp);
703
704                 deuuidify(imp->imp_target_uuid.uuid, NULL,
705                           &target_start, &target_len);
706                 nidstr = libcfs_nid2str(imp->imp_connection->c_peer.nid);
707
708                 LCONSOLE_INFO("Connection restored to service %.*s using nid "
709                               "%s.\n", target_len, target_start, nidstr);
710
711                 CWARN("%s: connection restored to %s@%s\n",
712                       imp->imp_obd->obd_name,
713                       imp->imp_target_uuid.uuid,
714                       imp->imp_connection->c_remote_uuid.uuid);
715         }
716
717         if (imp->imp_state == LUSTRE_IMP_FULL) {
718                 wake_up(&imp->imp_recovery_waitq);
719                 ptlrpc_wake_delayed(imp);
720         }
721
722  out:
723         RETURN(rc);
724 }
725
726 static int back_to_sleep(void *unused)
727 {
728         return 0;
729 }
730
731 int ptlrpc_disconnect_import(struct obd_import *imp)
732 {
733         struct ptlrpc_request *request;
734         int rq_opc;
735         int rc = 0;
736         unsigned long flags;
737         ENTRY;
738
739         switch (imp->imp_connect_op) {
740         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
741         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
742         case MGMT_CONNECT:rq_opc = MGMT_DISCONNECT;break;
743         default:
744                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
745                        imp->imp_target_uuid.uuid, imp->imp_connect_op);
746                 RETURN(-EINVAL);
747         }
748
749
750         if (ptlrpc_import_in_recovery(imp)) {
751                 struct l_wait_info lwi;
752                 lwi = LWI_TIMEOUT_INTR(MAX(obd_timeout * HZ, 1), back_to_sleep,
753                                        NULL, NULL);
754                 rc = l_wait_event(imp->imp_recovery_waitq,
755                                   !ptlrpc_import_in_recovery(imp), &lwi);
756
757         }
758
759         spin_lock_irqsave(&imp->imp_lock, flags);
760         if (imp->imp_state != LUSTRE_IMP_FULL) {
761                 GOTO(out, 0);
762         }
763         spin_unlock_irqrestore(&imp->imp_lock, flags);
764
765         request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
766         if (request) {
767                 /* For non-replayable connections, don't attempt
768                    reconnect if this fails */
769                 if (!imp->imp_replayable) {
770                         request->rq_no_resend = 1;
771                         IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
772                         request->rq_send_state =  LUSTRE_IMP_CONNECTING;
773                 }
774                 request->rq_replen = lustre_msg_size(0, NULL);
775                 rc = ptlrpc_queue_wait(request);
776                 ptlrpc_req_finished(request);
777         }
778
779         spin_lock_irqsave(&imp->imp_lock, flags);
780 out:
781         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
782         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
783         spin_unlock_irqrestore(&imp->imp_lock, flags);
784
785         RETURN(rc);
786 }
787