Whamcloud - gitweb
- many gcc4 compilation fixes (warnings)
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Mike Shaver <shaver@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 # include <liblustre.h>
26 #endif
27
28 #include <linux/obd_support.h>
29 #include <linux/lustre_ha.h>
30 #include <linux/lustre_net.h>
31 #include <linux/lustre_import.h>
32 #include <linux/lustre_export.h>
33 #include <linux/obd.h>
34 #include <linux/obd_class.h>
35 #include <linux/lustre_sec.h>
36
37 #include "ptlrpc_internal.h"
38
39 struct ptlrpc_connect_async_args {
40          __u64 pcaa_peer_committed;
41         int pcaa_initial_connect;
42 };
43
44 /* A CLOSED import should remain so. */
45 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
46 do {                                                                           \
47         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
48                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
49                       imp, imp->imp_target_uuid.uuid,                          \
50                       ptlrpc_import_state_name(imp->imp_state),                \
51                       ptlrpc_import_state_name(state));                        \
52                imp->imp_state = state;                                         \
53         }                                                                      \
54 } while(0)
55
56 #define IMPORT_SET_STATE(imp, state)                    \
57 do {                                                    \
58         unsigned long flags;                            \
59                                                         \
60         spin_lock_irqsave(&imp->imp_lock, flags);       \
61         IMPORT_SET_STATE_NOLOCK(imp, state);            \
62         spin_unlock_irqrestore(&imp->imp_lock, flags);  \
63 } while(0)
64
65
66 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
67                                     void * data, int rc);
68 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
69
70 /* Only this function is allowed to change the import state when it is
71  * CLOSED. I would rather refcount the import and free it after
72  * disconnection like we do with exports. To do that, the client_obd
73  * will need to save the peer info somewhere other than in the import,
74  * though. */
75 int ptlrpc_init_import(struct obd_import *imp)
76 {
77         unsigned long flags;
78
79         spin_lock_irqsave(&imp->imp_lock, flags);
80
81         imp->imp_generation++;
82         imp->imp_state =  LUSTRE_IMP_NEW;
83
84         spin_unlock_irqrestore(&imp->imp_lock, flags);
85
86         return 0;
87 }
88
89 /* Returns true if import was FULL, false if import was already not
90  * connected.
91  */
92 int ptlrpc_set_import_discon(struct obd_import *imp)
93 {
94         unsigned long flags;
95         int rc = 0;
96
97         spin_lock_irqsave(&imp->imp_lock, flags);
98
99         if (imp->imp_state == LUSTRE_IMP_FULL) {
100                 CWARN("%s: connection lost to %s@%s\n",
101                       imp->imp_obd->obd_name, 
102                       imp->imp_target_uuid.uuid,
103                       imp->imp_connection->c_remote_uuid.uuid);
104                 ptlrpc_deactivate_timeouts();
105                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
106                 spin_unlock_irqrestore(&imp->imp_lock, flags);
107                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
108                 rc = 1;
109         } else {
110                 spin_unlock_irqrestore(&imp->imp_lock, flags);
111                 CDEBUG(D_HA, "%p %s: import already not connected: %s\n",
112                        imp,imp->imp_client->cli_name,
113                        ptlrpc_import_state_name(imp->imp_state));
114         }
115
116         return rc;
117 }
118
119 /*
120  * This acts as a barrier; all existing requests are rejected, and
121  * no new requests will be accepted until the import is valid again.
122  */
123 void ptlrpc_deactivate_import(struct obd_import *imp)
124 {
125         unsigned long flags;
126         ENTRY;
127
128         spin_lock_irqsave(&imp->imp_lock, flags);
129         CDEBUG(D_HA, "setting import %s INVALID\n",
130                imp->imp_target_uuid.uuid);
131         imp->imp_invalid = 1;
132         imp->imp_generation++;
133         spin_unlock_irqrestore(&imp->imp_lock, flags);
134
135         ptlrpc_abort_inflight(imp);
136         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
137 }
138
139 /*
140  * This function will invalidate the import, if necessary, then block
141  * for all the RPC completions, and finally notify the obd to
142  * invalidate its state (ie cancel locks, clear pending requests,
143  * etc).
144  *
145  * in_rpc: true if this is called while processing an rpc, like
146  *    CONNECT. It will allow for one RPC to be inflight while
147  *    waiting for requests to complete. Ugly, yes, but I don't see an
148  *    cleaner way right now.
149  */
150 void ptlrpc_invalidate_import(struct obd_import *imp, int in_rpc)
151 {
152         struct l_wait_info lwi;
153         unsigned long timeout;
154         int inflight = 0;
155         int rc;
156
157         if (!imp->imp_invalid)
158                 ptlrpc_deactivate_import(imp);
159
160         LASSERT(imp->imp_invalid);
161
162         if (in_rpc)
163                 inflight = 1;
164
165         /* wait for all requests to error out and call completion 
166            callbacks */
167         if (imp->imp_server_timeout)
168                 timeout = obd_timeout / 2;
169         else
170                 timeout = obd_timeout;
171         timeout = MAX(timeout * HZ, 1);
172         lwi = LWI_TIMEOUT_INTR(timeout, NULL, NULL, NULL);
173         rc = l_wait_event(imp->imp_recovery_waitq, 
174                           (atomic_read(&imp->imp_inflight) == inflight), 
175                           &lwi);
176
177         if (rc)
178                 CERROR("%s: rc = %d waiting for callback (%d != %d)\n",
179                        imp->imp_target_uuid.uuid, rc,
180                        atomic_read(&imp->imp_inflight), !!in_rpc);
181
182         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
183 }
184
185 void ptlrpc_activate_import(struct obd_import *imp)
186 {
187         struct obd_device *obd = imp->imp_obd;
188         unsigned long flags;
189
190         spin_lock_irqsave(&imp->imp_lock, flags);
191         imp->imp_invalid = 0;
192         spin_unlock_irqrestore(&imp->imp_lock, flags);
193
194         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
195         ptlrpc_activate_timeouts();
196 }
197
198 void ptlrpc_fail_import(struct obd_import *imp, int generation)
199 {
200         ENTRY;
201
202         LASSERT (!imp->imp_dlm_fake);
203
204         if (ptlrpc_set_import_discon(imp)) {
205                 unsigned long flags;
206
207                 if (!imp->imp_replayable) {
208                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
209                                "auto-deactivating\n",
210                                imp->imp_target_uuid.uuid,
211                                imp->imp_connection->c_remote_uuid.uuid,
212                                imp->imp_obd->obd_name);
213                         ptlrpc_deactivate_import(imp);
214                 }
215
216                 CDEBUG(D_HA, "%s: waking up pinger\n",
217                        imp->imp_target_uuid.uuid);
218
219                 spin_lock_irqsave(&imp->imp_lock, flags);
220                 imp->imp_force_verify = 1;
221                 spin_unlock_irqrestore(&imp->imp_lock, flags);
222
223                 ptlrpc_pinger_wake_up();
224         }
225         EXIT;
226 }
227
228 #define ATTEMPT_TOO_SOON(last)  \
229         ((last) && ((long)(jiffies - (last)) <= (long)(obd_timeout * 2 * HZ)))
230
231 static int import_select_connection(struct obd_import *imp)
232 {
233         struct obd_import_conn *imp_conn, *tmp;
234         struct obd_export *dlmexp;
235         int found = 0;
236         ENTRY;
237
238         spin_lock(&imp->imp_lock);
239
240         if (list_empty(&imp->imp_conn_list)) {
241                 CERROR("no available connections on imp %p@%s\n",
242                         imp, imp->imp_obd->obd_name);
243                 spin_unlock(&imp->imp_lock);
244                 RETURN(-EINVAL);
245         }
246
247         list_for_each_entry(imp_conn, &imp->imp_conn_list, oic_item) {
248                 if (!ATTEMPT_TOO_SOON(imp_conn->oic_last_attempt)) {
249                         found = 1;
250                         break;
251                 }
252         }
253
254         /* if not found, simply choose the current one */
255         if (!found) {
256                 CWARN("obd %s imp 0x%p: all connections have been "
257                       "tried recently\n", imp->imp_obd->obd_name, imp);
258                 LASSERT(imp->imp_conn_current);
259                 imp_conn = imp->imp_conn_current;
260         }
261         LASSERT(imp_conn->oic_conn);
262
263         imp_conn->oic_last_attempt = jiffies;
264
265         /* move the items ahead of the selected one to list tail */
266         while (1) {
267                 tmp= list_entry(imp->imp_conn_list.next,
268                                 struct obd_import_conn, oic_item);
269                 if (tmp == imp_conn)
270                         break;
271                 list_del(&tmp->oic_item);
272                 list_add_tail(&tmp->oic_item, &imp->imp_conn_list);
273         }
274
275         /* switch connection if we chose a new one */
276         if (imp->imp_connection != imp_conn->oic_conn) {
277                 if (imp->imp_connection) {
278                         ptlrpcs_sec_invalidate_cache(imp->imp_sec);
279                         ptlrpc_put_connection(imp->imp_connection);
280                 }
281                 imp->imp_connection =
282                         ptlrpc_connection_addref(imp_conn->oic_conn);
283         }
284
285         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
286         LASSERT(dlmexp != NULL);
287         if (dlmexp->exp_connection)
288                 ptlrpc_put_connection(imp->imp_connection);
289         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
290         class_export_put(dlmexp);
291
292         imp->imp_conn_current = imp_conn;
293         CWARN("obd %s imp 0x%p: select conn %s\n",
294                imp->imp_obd->obd_name, imp,
295                imp_conn->oic_uuid.uuid);
296         spin_unlock(&imp->imp_lock);
297
298         RETURN(0);
299 }
300
301 /*
302  * must be called under imp_lock
303  */
304 int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
305 {
306         struct ptlrpc_request *req;
307         struct list_head *tmp;
308         
309         if (list_empty(&imp->imp_replay_list))
310                 return 0;
311         tmp = imp->imp_replay_list.next;
312         req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
313         *transno = req->rq_transno;
314         return 1;
315 }
316
317 int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
318 {
319         struct obd_device *obd = imp->imp_obd;
320         int initial_connect = 0;
321         int set_transno = 0;
322         int rc;
323         __u64 committed_before_reconnect = 0;
324         struct ptlrpc_request *request;
325         int size[] = {0,
326                       sizeof(imp->imp_target_uuid),
327                       sizeof(obd->obd_uuid),
328                       sizeof(imp->imp_dlm_handle),
329                       sizeof(imp->imp_connect_flags),
330                       sizeof(imp->imp_connect_data)};
331         char *tmp[] = {NULL,
332                        (char *)imp->imp_target_uuid.uuid,
333                        (char *)obd->obd_uuid.uuid,
334                        (char *)&imp->imp_dlm_handle,
335                        (char *)&imp->imp_connect_flags, /* XXX: make this portable! */
336                        (char*) &imp->imp_connect_data};
337         int repsize = sizeof(struct obd_connect_data);
338                         
339         struct ptlrpc_connect_async_args *aa;
340         unsigned long flags;
341
342         spin_lock_irqsave(&imp->imp_lock, flags);
343         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
344                 spin_unlock_irqrestore(&imp->imp_lock, flags);
345                 CERROR("can't connect to a closed import\n");
346                 RETURN(-EINVAL);
347         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
348                 spin_unlock_irqrestore(&imp->imp_lock, flags);
349                 CERROR("already connected\n");
350                 RETURN(0);
351         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
352                 spin_unlock_irqrestore(&imp->imp_lock, flags);
353                 CERROR("already connecting\n");
354                 RETURN(-EALREADY);
355         }
356
357         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
358
359         imp->imp_resend_replay = 0;
360
361         if (imp->imp_remote_handle.cookie == 0) {
362                 initial_connect = 1;
363         } else {
364                 committed_before_reconnect = imp->imp_peer_committed_transno;;
365                 imp->imp_conn_cnt++;
366         }
367
368         set_transno = ptlrpc_first_transno(imp, &imp->imp_connect_data.transno);
369
370         spin_unlock_irqrestore(&imp->imp_lock, flags);
371
372         if (new_uuid) {
373                 struct obd_uuid uuid;
374
375                 obd_str2uuid(&uuid, new_uuid);
376
377                 rc = import_set_conn_priority(imp, &uuid);
378                 if (rc)
379                         GOTO(out, rc);
380         }
381         rc = import_select_connection(imp);
382         if (rc)
383                 GOTO(out, rc);
384
385         LASSERT(imp->imp_sec);
386
387         size[0] = lustre_secdesc_size();
388         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION,
389                                   imp->imp_connect_op, 6, size, tmp);
390         if (!request)
391                 GOTO(out, rc = -ENOMEM);
392
393         lustre_pack_secdesc(request, size[0]);
394
395 #ifndef __KERNEL__
396         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
397 #endif
398         if (obd->u.cli.cl_async) {
399                 lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_ASYNC);
400         }
401
402         request->rq_send_state = LUSTRE_IMP_CONNECTING;
403         request->rq_replen = lustre_msg_size(1, &repsize);
404         request->rq_interpret_reply = ptlrpc_connect_interpret;
405
406         LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
407         aa = (struct ptlrpc_connect_async_args *)&request->rq_async_args;
408         memset(aa, 0, sizeof *aa);
409
410         aa->pcaa_peer_committed = committed_before_reconnect;
411         aa->pcaa_initial_connect = initial_connect;
412
413         if (aa->pcaa_initial_connect) {
414                 lustre_msg_add_op_flags(request->rq_reqmsg, 
415                                         MSG_CONNECT_INITIAL);
416                 imp->imp_replayable = 1; 
417         }
418         if (set_transno)
419                 lustre_msg_add_op_flags(request->rq_reqmsg, 
420                                         MSG_CONNECT_TRANSNO);
421         
422         imp->imp_reqs_replayed = imp->imp_locks_replayed = 0;
423
424         ptlrpcd_add_req(request);
425         rc = 0;
426         imp->imp_connect_start = jiffies;
427 out:
428         if (rc != 0) {
429                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
430         }
431
432         RETURN(rc);
433 }
434
435 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
436                                     void *data, int rc)
437 {
438         struct ptlrpc_connect_async_args *aa = data;
439         struct obd_import *imp = request->rq_import;
440         struct lustre_handle old_hdl;
441         unsigned long flags;
442         int msg_flags;
443         ENTRY;
444
445         spin_lock_irqsave(&imp->imp_lock, flags);
446         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
447                 spin_unlock_irqrestore(&imp->imp_lock, flags);
448                 RETURN(0);
449         }
450         spin_unlock_irqrestore(&imp->imp_lock, flags);
451
452         if (rc)
453                 GOTO(out, rc);
454         LASSERT(imp->imp_conn_current);
455         imp->imp_conn_current->oic_last_attempt = 0;
456 /*
457         remote_flag = lustre_msg_buf(request->rq_repmsg, 0, sizeof(int));
458         LASSERT(remote_flag != NULL);
459         imp->imp_obd->u.cli.cl_remote = *remote_flag;
460 */
461         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
462
463         if (aa->pcaa_initial_connect) {
464                 struct obd_connect_data *conn_data;
465
466                 conn_data = lustre_swab_repbuf(request, 0, sizeof(*conn_data),
467                                                lustre_swab_connect);
468                 LASSERT(conn_data);
469                 imp->imp_connect_data.ocd_connect_flags =
470                                         conn_data->ocd_connect_flags;
471
472                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
473                         CDEBUG(D_HA, "connected to replayable target: %s\n",
474                                imp->imp_target_uuid.uuid);
475                         imp->imp_pingable = imp->imp_replayable = 1;
476                 } else {
477                         imp->imp_replayable = 0;
478                 }
479                 LASSERTF(imp->imp_conn_cnt < request->rq_repmsg->conn_cnt,
480                          "imp conn_cnt %d req conn_cnt %d", 
481                          imp->imp_conn_cnt, request->rq_repmsg->conn_cnt);
482                 imp->imp_conn_cnt = request->rq_repmsg->conn_cnt;
483                 imp->imp_remote_handle = request->rq_repmsg->handle;
484                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
485                 ptlrpc_pinger_sending_on_import(imp);
486                 GOTO(finish, rc = 0);
487         }
488
489         /* Determine what recovery state to move the import to. */
490         if (MSG_CONNECT_RECONNECT & msg_flags) {
491                 memset(&old_hdl, 0, sizeof(old_hdl));
492                 if (!memcmp(&old_hdl, &request->rq_repmsg->handle,
493                             sizeof (old_hdl))) {
494                         CERROR("%s@%s didn't like our handle "LPX64
495                                ", failed\n", imp->imp_target_uuid.uuid,
496                                imp->imp_connection->c_remote_uuid.uuid,
497                                imp->imp_dlm_handle.cookie);
498                         GOTO(out, rc = -ENOTCONN);
499                 }
500
501                 if (memcmp(&imp->imp_remote_handle, &request->rq_repmsg->handle,
502                            sizeof(imp->imp_remote_handle))) {
503                         CERROR("%s@%s changed handle from "LPX64" to "LPX64
504                                "; copying, but this may foreshadow disaster\n",
505                                imp->imp_target_uuid.uuid,
506                                imp->imp_connection->c_remote_uuid.uuid,
507                                imp->imp_remote_handle.cookie,
508                                request->rq_repmsg->handle.cookie);
509                         imp->imp_remote_handle = request->rq_repmsg->handle;
510                 } else {
511                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
512                                imp->imp_target_uuid.uuid,
513                                imp->imp_connection->c_remote_uuid.uuid);
514                 }
515
516                 if (imp->imp_invalid) {
517                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
518                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
519                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
520                                imp->imp_obd->obd_name, 
521                                imp->imp_target_uuid.uuid);
522                         imp->imp_resend_replay = 1;
523                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
524                 } else {
525                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
526                 }
527         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
528                 LASSERT(imp->imp_replayable);
529                 imp->imp_remote_handle = request->rq_repmsg->handle;
530                 imp->imp_last_replay_transno = 0;
531                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
532         } else {
533                 CDEBUG(D_HA, "oops! we get evicted from %s\n", imp->imp_target_uuid.uuid);
534                 imp->imp_remote_handle = request->rq_repmsg->handle;
535                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
536         }
537
538         /* Sanity checks for a reconnected import. */
539         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
540                 CERROR("imp_replayable flag does not match server "
541                        "after reconnect. We should LBUG right here.\n");
542         }
543
544         if (request->rq_repmsg->last_committed < aa->pcaa_peer_committed) {
545                 CERROR("%s went back in time (transno "LPD64
546                        " was previously committed, server now claims "LPD64
547                        ")! is shared storage not coherent?\n",
548                        imp->imp_target_uuid.uuid,
549                        aa->pcaa_peer_committed,
550                        request->rq_repmsg->last_committed);
551         }
552
553 finish:
554         rc = ptlrpc_import_recovery_state_machine(imp);
555         if (rc != 0) {
556                 if (rc == -ENOTCONN) {
557                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
558                                "invalidating and reconnecting\n",
559                                imp->imp_target_uuid.uuid,
560                                imp->imp_connection->c_remote_uuid.uuid);
561                         ptlrpc_connect_import(imp, NULL);
562                         RETURN(0);
563                 }
564         }
565  out:
566         if (rc != 0) {
567                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
568                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov)
569                         ptlrpc_deactivate_import(imp);
570                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
571                        imp->imp_target_uuid.uuid,
572                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
573         }
574
575         wake_up(&imp->imp_recovery_waitq);
576         RETURN(rc);
577 }
578
579 static int completed_replay_interpret(struct ptlrpc_request *req,
580                                       void *data, int rc)
581 {
582         atomic_dec(&req->rq_import->imp_replay_inflight);
583         if (req->rq_status == 0) {
584                 ptlrpc_import_recovery_state_machine(req->rq_import);
585         } else {
586                 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
587                        "reconnecting\n", 
588                        req->rq_import->imp_obd->obd_name, req->rq_status);
589                 ptlrpc_connect_import(req->rq_import, NULL);
590         }
591
592         RETURN(0);
593 }
594
595 static int signal_completed_replay(struct obd_import *imp)
596  {
597         struct ptlrpc_request *req;
598         ENTRY;
599
600         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
601         atomic_inc(&imp->imp_replay_inflight);
602
603         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 0, NULL, NULL);
604         if (!req) {
605                 atomic_dec(&imp->imp_replay_inflight);
606                 RETURN(-ENOMEM);
607         }
608
609         req->rq_replen = lustre_msg_size(0, NULL);
610         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
611         req->rq_reqmsg->flags |= MSG_LOCK_REPLAY_DONE | MSG_REQ_REPLAY_DONE;
612         req->rq_timeout *= 3;
613         req->rq_interpret_reply = completed_replay_interpret;
614
615         ptlrpcd_add_req(req);
616         RETURN(0);
617 }
618
619 #ifdef __KERNEL__
620 static int ptlrpc_invalidate_import_thread(void *data)
621 {
622         struct obd_import *imp = data;
623         unsigned long flags;
624
625         ENTRY;
626
627         lock_kernel();
628         ptlrpc_daemonize();
629
630         SIGNAL_MASK_LOCK(current, flags);
631         sigfillset(&current->blocked);
632         RECALC_SIGPENDING;
633         SIGNAL_MASK_UNLOCK(current, flags);
634         THREAD_NAME(current->comm, sizeof(current->comm), "ll_imp_inval");
635         unlock_kernel();
636
637         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
638                imp->imp_obd->obd_name, imp->imp_target_uuid.uuid,
639                imp->imp_connection->c_remote_uuid.uuid);
640
641         ptlrpc_invalidate_import(imp, 0);
642         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
643
644         ptlrpc_import_recovery_state_machine(imp);
645
646         RETURN(0);
647 }
648 #endif
649
650 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
651 {
652         int rc = 0;
653         int inflight;
654
655         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
656                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
657                        imp->imp_target_uuid.uuid,
658                        imp->imp_connection->c_remote_uuid.uuid);
659
660 #ifdef __KERNEL__
661                 rc = kernel_thread(ptlrpc_invalidate_import_thread, imp,
662                                    CLONE_VM | CLONE_FILES);
663                 if (rc < 0)
664                         CERROR("error starting invalidate thread: %d\n", rc);
665                 RETURN(rc < 0 ? rc : 0);
666 #else
667                 ptlrpc_invalidate_import(imp, 1);
668
669                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
670 #endif
671         }
672
673         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
674                 CDEBUG(D_HA, "replay requested by %s\n",
675                        imp->imp_target_uuid.uuid);
676                 rc = ptlrpc_replay_next(imp, &inflight);
677                 if (inflight == 0 &&
678                     atomic_read(&imp->imp_replay_inflight) == 0) {
679                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
680                         rc = ldlm_replay_locks(imp);
681                         if (rc)
682                                 GOTO(out, rc);
683                 }
684                 rc = 0;
685         }
686
687         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
688                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
689                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
690                         rc = signal_completed_replay(imp);
691                         if (rc)
692                                 GOTO(out, rc);
693                 }
694
695         }
696
697         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
698                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
699                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
700                 }
701         }
702
703         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
704                 CDEBUG(D_HA, "reconnected to %s@%s\n",
705                        imp->imp_target_uuid.uuid,
706                        imp->imp_connection->c_remote_uuid.uuid);
707
708                 rc = ptlrpc_resend(imp);
709                 if (rc)
710                         GOTO(out, rc);
711                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
712                 ptlrpc_activate_import(imp);
713                 CWARN("%s: connection restored to %s@%s, "
714                        "%d/%d req/lock replayed\n",
715                       imp->imp_obd->obd_name, 
716                       imp->imp_target_uuid.uuid,
717                       imp->imp_connection->c_remote_uuid.uuid,
718                       imp->imp_reqs_replayed,
719                       imp->imp_locks_replayed);
720         }
721
722         if (imp->imp_state == LUSTRE_IMP_FULL) {
723                 wake_up(&imp->imp_recovery_waitq);
724                 ptlrpc_wake_delayed(imp);
725         }
726
727  out:
728         RETURN(rc);
729 }
730
731 static int back_to_sleep(void *unused)
732 {
733         return 0;
734 }
735
736 int ptlrpc_disconnect_import(struct obd_import *imp)
737 {
738         struct ptlrpc_request *request;
739         int rq_opc;
740         int rc = 0;
741         unsigned long flags;
742         ENTRY;
743
744         switch (imp->imp_connect_op) {
745         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
746         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
747         case MGMT_CONNECT: rq_opc = MGMT_DISCONNECT; break;
748         default:
749                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
750                        imp->imp_target_uuid.uuid, imp->imp_connect_op);
751                 RETURN(-EINVAL);
752         }
753
754
755         if (ptlrpc_import_in_recovery(imp)) {
756                 struct l_wait_info lwi;
757                 unsigned long timeout;
758                 if (imp->imp_server_timeout)
759                         timeout = obd_timeout / 2;
760                 else
761                         timeout = obd_timeout;
762                 timeout = MAX(timeout * HZ, 1);
763                 lwi = LWI_TIMEOUT_INTR(obd_timeout, back_to_sleep, NULL, NULL);
764                 rc = l_wait_event(imp->imp_recovery_waitq, 
765                                   !ptlrpc_import_in_recovery(imp), &lwi);
766
767         }
768
769         spin_lock_irqsave(&imp->imp_lock, flags);
770         if (imp->imp_state != LUSTRE_IMP_FULL) {
771                 GOTO(out, 0);
772         }
773         spin_unlock_irqrestore(&imp->imp_lock, flags);
774
775         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc,
776                                   0, NULL, NULL);
777         if (request) {
778                 /* For non-replayable connections, don't attempt
779                    reconnect if this fails */
780                 if (!imp->imp_replayable) {
781                         request->rq_no_resend = 1;
782                         IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
783                         request->rq_send_state =  LUSTRE_IMP_CONNECTING;
784                 }
785                 request->rq_replen = lustre_msg_size(0, NULL);
786                 rc = ptlrpc_queue_wait(request);
787                 ptlrpc_req_finished(request);
788         }
789
790         spin_lock_irqsave(&imp->imp_lock, flags);
791 out:
792         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
793         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
794         imp->imp_conn_cnt = 0;
795         spin_unlock_irqrestore(&imp->imp_lock, flags);
796
797         RETURN(rc);
798 }
799