Whamcloud - gitweb
cf5e0bef6aac8f0d0c84742bb28d4de5af2daa5e
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Mike Shaver <shaver@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifdef __KERNEL__
25 # include <linux/config.h>
26 # include <linux/module.h>
27 # include <linux/kmod.h>
28 #else
29 # include <liblustre.h>
30 #endif
31
32 #include <linux/obd_support.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_net.h>
35 #include <linux/lustre_import.h>
36 #include <linux/lustre_export.h>
37 #include <linux/obd.h>
38 #include <linux/obd_class.h>
39
40 #include "ptlrpc_internal.h"
41
42 struct ptlrpc_connect_async_args {
43          __u64 pcaa_peer_committed;
44         int pcaa_initial_connect;
45 };
46
47 /* A CLOSED import should remain so. */
48 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
49 do {                                                                           \
50         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
51                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
52                       imp, imp->imp_target_uuid.uuid,                          \
53                       ptlrpc_import_state_name(imp->imp_state),                \
54                       ptlrpc_import_state_name(state));                        \
55                imp->imp_state = state;                                         \
56         }                                                                      \
57 } while(0)
58
59 #define IMPORT_SET_STATE(imp, state)                    \
60 do {                                                    \
61         unsigned long flags;                            \
62                                                         \
63         spin_lock_irqsave(&imp->imp_lock, flags);       \
64         IMPORT_SET_STATE_NOLOCK(imp, state);            \
65         spin_unlock_irqrestore(&imp->imp_lock, flags);  \
66 } while(0)
67
68
69 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
70                                     void * data, int rc);
71 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
72
73 /* Only this function is allowed to change the import state when it is
74  * CLOSED. I would rather refcount the import and free it after
75  * disconnection like we do with exports. To do that, the client_obd
76  * will need to save the peer info somewhere other than in the import,
77  * though. */
78 int ptlrpc_init_import(struct obd_import *imp)
79 {
80         unsigned long flags;
81
82         spin_lock_irqsave(&imp->imp_lock, flags);
83
84         imp->imp_generation++;
85         imp->imp_state =  LUSTRE_IMP_NEW;
86
87         spin_unlock_irqrestore(&imp->imp_lock, flags);
88
89         return 0;
90 }
91
92 /* Returns true if import was FULL, false if import was already not
93  * connected.
94  */
95 int ptlrpc_set_import_discon(struct obd_import *imp)
96 {
97         unsigned long flags;
98         int rc = 0;
99
100         spin_lock_irqsave(&imp->imp_lock, flags);
101
102         if (imp->imp_state == LUSTRE_IMP_FULL) {
103                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
104                 spin_unlock_irqrestore(&imp->imp_lock, flags);
105                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
106                 rc = 1;
107         } else {
108                 spin_unlock_irqrestore(&imp->imp_lock, flags);
109                 CDEBUG(D_HA, "%p %s: import already not connected: %s\n",
110                        imp,imp->imp_client->cli_name,
111                        ptlrpc_import_state_name(imp->imp_state));
112         }
113
114         return rc;
115 }
116
117 /*
118  * This acts as a barrier; all existing requests are rejected, and
119  * no new requests will be accepted until the import is valid again.
120  */
121 void ptlrpc_deactivate_import(struct obd_import *imp)
122 {
123         unsigned long flags;
124         ENTRY;
125
126         spin_lock_irqsave(&imp->imp_lock, flags);
127         CDEBUG(D_HA, "setting import %s INVALID\n",
128                imp->imp_target_uuid.uuid);
129         imp->imp_invalid = 1;
130         imp->imp_generation++;
131         spin_unlock_irqrestore(&imp->imp_lock, flags);
132
133         ptlrpc_abort_inflight(imp);
134         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
135 }
136
137 /*
138  * This function will invalidate the import, if necessary, then block
139  * for all the RPC completions, and finally notify the obd to
140  * invalidate its state (ie cancel locks, clear pending requests,
141  * etc).
142  *
143  * in_rpc: true if this is called while processing an rpc, like
144  *    CONNECT. It will allow for one RPC to be inflight while
145  *    waiting for requests to complete. Ugly, yes, but I don't see an
146  *    cleaner way right now.
147  */
148 void ptlrpc_invalidate_import(struct obd_import *imp, int in_rpc)
149 {
150         struct l_wait_info lwi;
151         unsigned long timeout;
152         int inflight = 0;
153         int rc;
154
155         if (!imp->imp_invalid)
156                 ptlrpc_deactivate_import(imp);
157
158         LASSERT(imp->imp_invalid);
159
160         if (in_rpc)
161                 inflight = 1;
162
163         /* wait for all requests to error out and call completion 
164            callbacks */
165         if (imp->imp_server_timeout)
166                 timeout = obd_timeout / 2;
167         else
168                 timeout = obd_timeout;
169         timeout = MAX(timeout * HZ, 1);
170         lwi = LWI_TIMEOUT_INTR(timeout, NULL, NULL, NULL);
171         rc = l_wait_event(imp->imp_recovery_waitq, 
172                           (atomic_read(&imp->imp_inflight) == inflight), 
173                           &lwi);
174
175         if (rc)
176                 CERROR("%s: rc = %d waiting for callback (%d != %d)\n",
177                        imp->imp_target_uuid.uuid, rc,
178                        atomic_read(&imp->imp_inflight), inflight);
179
180         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
181 }
182
183 void ptlrpc_activate_import(struct obd_import *imp)
184 {
185         struct obd_device *obd = imp->imp_obd;
186         unsigned long flags;
187
188         spin_lock_irqsave(&imp->imp_lock, flags);
189         imp->imp_invalid = 0;
190         spin_unlock_irqrestore(&imp->imp_lock, flags);
191
192         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
193 }
194
195 void ptlrpc_fail_import(struct obd_import *imp, int generation)
196 {
197         ENTRY;
198
199         LASSERT (!imp->imp_dlm_fake);
200
201         if (ptlrpc_set_import_discon(imp)) {
202                 unsigned long flags;
203
204                 if (!imp->imp_replayable) {
205                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
206                                "auto-deactivating\n",
207                                imp->imp_target_uuid.uuid,
208                                imp->imp_connection->c_remote_uuid.uuid,
209                                imp->imp_obd->obd_name);
210                         ptlrpc_deactivate_import(imp);
211                 }
212
213                 CDEBUG(D_HA, "%s: waking up pinger\n",
214                        imp->imp_target_uuid.uuid);
215
216                 spin_lock_irqsave(&imp->imp_lock, flags);
217                 imp->imp_force_verify = 1;
218                 spin_unlock_irqrestore(&imp->imp_lock, flags);
219
220                 ptlrpc_pinger_wake_up();
221         }
222         EXIT;
223 }
224
225 int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
226 {
227         struct obd_device *obd = imp->imp_obd;
228         int initial_connect = 0;
229         int rc;
230         __u64 committed_before_reconnect = 0;
231         struct ptlrpc_request *request;
232         int size[] = {sizeof(imp->imp_target_uuid),
233                                  sizeof(obd->obd_uuid),
234                                  sizeof(imp->imp_dlm_handle)};
235         char *tmp[] = {imp->imp_target_uuid.uuid,
236                        obd->obd_uuid.uuid,
237                        (char *)&imp->imp_dlm_handle};
238         struct ptlrpc_connect_async_args *aa;
239         unsigned long flags;
240
241         spin_lock_irqsave(&imp->imp_lock, flags);
242         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
243                 spin_unlock_irqrestore(&imp->imp_lock, flags);
244                 CERROR("can't connect to a closed import\n");
245                 RETURN(-EINVAL);
246         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
247                 spin_unlock_irqrestore(&imp->imp_lock, flags);
248                 CERROR("already connected\n");
249                 RETURN(0);
250         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
251                 spin_unlock_irqrestore(&imp->imp_lock, flags);
252                 CERROR("already connecting\n");
253                 RETURN(-EALREADY);
254         }
255
256         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
257
258         imp->imp_resend_replay = 0;
259
260         if (imp->imp_remote_handle.cookie == 0) {
261                 initial_connect = 1;
262         } else {
263                 committed_before_reconnect = imp->imp_peer_committed_transno;;
264                 imp->imp_conn_cnt++;
265         }
266
267
268         spin_unlock_irqrestore(&imp->imp_lock, flags);
269
270         if (new_uuid) {
271                 struct ptlrpc_connection *conn;
272                 struct obd_uuid uuid;
273                 struct obd_export *dlmexp;
274
275                 obd_str2uuid(&uuid, new_uuid);
276
277                 conn = ptlrpc_uuid_to_connection(&uuid);
278                 if (!conn)
279                         GOTO(out, rc = -ENOENT);
280
281                 CDEBUG(D_HA, "switching import %s/%s from %s to %s\n",
282                        imp->imp_target_uuid.uuid, imp->imp_obd->obd_name,
283                        imp->imp_connection->c_remote_uuid.uuid,
284                        conn->c_remote_uuid.uuid);
285
286                 /* Switch the import's connection and the DLM export's
287                  * connection (which are almost certainly the same, but we
288                  * keep distinct refs just to make things clearer. I think. */
289                 if (imp->imp_connection)
290                         ptlrpc_put_connection(imp->imp_connection);
291                 /* We hand off the ref from ptlrpc_get_connection. */
292                 imp->imp_connection = conn;
293
294                 dlmexp = class_conn2export(&imp->imp_dlm_handle);
295
296                 LASSERT(dlmexp != NULL);
297
298                 if (dlmexp->exp_connection)
299                         ptlrpc_put_connection(dlmexp->exp_connection);
300                 dlmexp->exp_connection = ptlrpc_connection_addref(conn);
301                 class_export_put(dlmexp);
302
303         }
304
305         request = ptlrpc_prep_req(imp, imp->imp_connect_op, 3, size, tmp);
306         if (!request)
307                 GOTO(out, rc = -ENOMEM);
308
309 #ifndef __KERNEL__
310         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
311 #endif
312
313         request->rq_send_state = LUSTRE_IMP_CONNECTING;
314         request->rq_replen = lustre_msg_size(0, NULL);
315         request->rq_interpret_reply = ptlrpc_connect_interpret;
316
317         LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
318         aa = (struct ptlrpc_connect_async_args *)&request->rq_async_args;
319         memset(aa, 0, sizeof *aa);
320
321         aa->pcaa_peer_committed = committed_before_reconnect;
322         aa->pcaa_initial_connect = initial_connect;
323
324         if (aa->pcaa_initial_connect) {
325                 lustre_msg_add_op_flags(request->rq_reqmsg, 
326                                         MSG_CONNECT_INITIAL);
327                 imp->imp_replayable = 1; 
328         }
329
330         ptlrpcd_add_req(request);
331         rc = 0;
332         imp->imp_connect_start = jiffies;
333 out:
334         if (rc != 0) {
335                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
336         }
337
338         RETURN(rc);
339 }
340
341 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
342                                     void * data, int rc)
343 {
344         struct ptlrpc_connect_async_args *aa = data;
345         struct obd_import *imp = request->rq_import;
346         struct lustre_handle old_hdl;
347         unsigned long flags;
348         int msg_flags;
349         ENTRY;
350
351         spin_lock_irqsave(&imp->imp_lock, flags);
352         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
353                 spin_unlock_irqrestore(&imp->imp_lock, flags);
354                 RETURN(0);
355         }
356         spin_unlock_irqrestore(&imp->imp_lock, flags);
357
358         if (rc)
359                 GOTO(out, rc);
360
361         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
362
363         if (aa->pcaa_initial_connect) {
364                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
365                         CDEBUG(D_HA, "connected to replayable target: %s\n",
366                                imp->imp_target_uuid.uuid);
367                         imp->imp_pingable = imp->imp_replayable = 1;
368                 } else {
369                         imp->imp_replayable = 0;
370                 }
371                 LASSERTF(imp->imp_conn_cnt < request->rq_repmsg->conn_cnt,
372                          "imp conn_cnt %d req conn_cnt %d", 
373                          imp->imp_conn_cnt, request->rq_repmsg->conn_cnt);
374                 imp->imp_conn_cnt = request->rq_repmsg->conn_cnt;
375                 imp->imp_remote_handle = request->rq_repmsg->handle;
376                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
377                 GOTO(finish, rc = 0);
378         }
379
380         /* Determine what recovery state to move the import to. */
381         if (MSG_CONNECT_RECONNECT & msg_flags) {
382                 memset(&old_hdl, 0, sizeof(old_hdl));
383                 if (!memcmp(&old_hdl, &request->rq_repmsg->handle,
384                             sizeof (old_hdl))) {
385                         CERROR("%s@%s didn't like our handle "LPX64
386                                ", failed\n", imp->imp_target_uuid.uuid,
387                                imp->imp_connection->c_remote_uuid.uuid,
388                                imp->imp_dlm_handle.cookie);
389                         GOTO(out, rc = -ENOTCONN);
390                 }
391
392                 if (memcmp(&imp->imp_remote_handle, &request->rq_repmsg->handle,
393                            sizeof(imp->imp_remote_handle))) {
394                         CERROR("%s@%s changed handle from "LPX64" to "LPX64
395                                "; copying, but this may foreshadow disaster\n",
396                                imp->imp_target_uuid.uuid,
397                                imp->imp_connection->c_remote_uuid.uuid,
398                                imp->imp_remote_handle.cookie,
399                                request->rq_repmsg->handle.cookie);
400                         imp->imp_remote_handle = request->rq_repmsg->handle;
401                 } else {
402                         CERROR("reconnected to %s@%s after partition\n",
403                                imp->imp_target_uuid.uuid,
404                                imp->imp_connection->c_remote_uuid.uuid);
405                 }
406
407                 if (imp->imp_invalid) {
408                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
409                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
410                         imp->imp_resend_replay = 1;
411                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
412                 } else {
413                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
414                 }
415         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
416                 LASSERT(imp->imp_replayable);
417                 imp->imp_remote_handle = request->rq_repmsg->handle;
418                 imp->imp_last_replay_transno = 0;
419                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
420         } else {
421                 CWARN("oops! we get evicted from %s\n", imp->imp_target_uuid.uuid);
422                 imp->imp_remote_handle = request->rq_repmsg->handle;
423                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
424         }
425
426         /* Sanity checks for a reconnected import. */
427         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
428                 CERROR("imp_replayable flag does not match server "
429                        "after reconnect. We should LBUG right here.\n");
430         }
431
432         if (request->rq_repmsg->last_committed < aa->pcaa_peer_committed) {
433                 CERROR("%s went back in time (transno "LPD64
434                        " was previously committed, server now claims "LPD64
435                        ")! is shared storage not coherent?\n",
436                        imp->imp_target_uuid.uuid,
437                        aa->pcaa_peer_committed,
438                        request->rq_repmsg->last_committed);
439         }
440
441 finish:
442         rc = ptlrpc_import_recovery_state_machine(imp);
443         if (rc != 0) {
444                 if (rc == -ENOTCONN) {
445                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
446                                "invalidating and reconnecting\n",
447                                imp->imp_target_uuid.uuid,
448                                imp->imp_connection->c_remote_uuid.uuid);
449                         ptlrpc_connect_import(imp, NULL);
450                         RETURN(0);
451                 }
452         }
453  out:
454         if (rc != 0) {
455                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
456                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov) {
457                         ptlrpc_deactivate_import(imp);
458                 }
459                 if (rc == -ETIMEDOUT && (jiffies - imp->imp_connect_start) > HZ) {
460                         CDEBUG(D_ERROR, "recovery of %s on %s failed (timeout)\n",
461                                imp->imp_target_uuid.uuid,
462                                (char *)imp->imp_connection->c_remote_uuid.uuid);
463                         ptlrpc_connect_import(imp, NULL);
464                         RETURN(0);
465                 }
466                 CDEBUG(D_ERROR, "recovery of %s on %s failed (%d)\n",
467                        imp->imp_target_uuid.uuid,
468                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
469         }
470
471         wake_up(&imp->imp_recovery_waitq);
472         RETURN(rc);
473 }
474
475 static int completed_replay_interpret(struct ptlrpc_request *req,
476                                     void * data, int rc)
477 {
478         atomic_dec(&req->rq_import->imp_replay_inflight);
479         ptlrpc_import_recovery_state_machine(req->rq_import);
480         RETURN(0);
481 }
482
483 static int signal_completed_replay(struct obd_import *imp)
484  {
485         struct ptlrpc_request *req;
486         ENTRY;
487
488         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
489         atomic_inc(&imp->imp_replay_inflight);
490
491         req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL, NULL);
492         if (!req)
493                 RETURN(-ENOMEM);
494
495         req->rq_replen = lustre_msg_size(0, NULL);
496         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
497         req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
498         req->rq_timeout *= 3;
499         req->rq_interpret_reply = completed_replay_interpret;
500
501         ptlrpcd_add_req(req);
502         RETURN(0);
503 }
504
505 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
506 {
507         int rc = 0;
508         int inflight;
509
510         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
511                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
512                        imp->imp_target_uuid.uuid,
513                        imp->imp_connection->c_remote_uuid.uuid);
514
515                 ptlrpc_invalidate_import(imp, 1);
516
517                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
518         }
519
520         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
521                 CDEBUG(D_HA, "replay requested by %s\n",
522                        imp->imp_target_uuid.uuid);
523                 rc = ptlrpc_replay_next(imp, &inflight);
524                 if (inflight == 0 &&
525                     atomic_read(&imp->imp_replay_inflight) == 0) {
526                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
527                         rc = ldlm_replay_locks(imp);
528                         if (rc)
529                                 GOTO(out, rc);
530                 }
531                 rc = 0;
532         }
533
534         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
535                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
536                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
537                         rc = signal_completed_replay(imp);
538                         if (rc)
539                                 GOTO(out, rc);
540                 }
541
542         }
543
544         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
545                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
546                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
547                 }
548         }
549
550         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
551                 CDEBUG(D_HA, "reconnected to %s@%s\n",
552                        imp->imp_target_uuid.uuid,
553                        imp->imp_connection->c_remote_uuid.uuid);
554
555                 rc = ptlrpc_resend(imp);
556                 if (rc)
557                         GOTO(out, rc);
558                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
559                 ptlrpc_activate_import(imp);
560         }
561
562         if (imp->imp_state == LUSTRE_IMP_FULL) {
563                 wake_up(&imp->imp_recovery_waitq);
564                 ptlrpc_wake_delayed(imp);
565         }
566
567  out:
568         RETURN(rc);
569 }
570
571 static int back_to_sleep(void *unused)
572 {
573         return 0;
574 }
575
576 int ptlrpc_disconnect_import(struct obd_import *imp)
577 {
578         struct ptlrpc_request *request;
579         int rq_opc;
580         int rc = 0;
581         unsigned long flags;
582         ENTRY;
583
584         switch (imp->imp_connect_op) {
585         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
586         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
587         case MGMT_CONNECT:rq_opc = MGMT_DISCONNECT;break;
588         default:
589                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
590                        imp->imp_target_uuid.uuid, imp->imp_connect_op);
591                 RETURN(-EINVAL);
592         }
593
594
595         if (ptlrpc_import_in_recovery(imp)) {
596                 struct l_wait_info lwi;
597                 unsigned long timeout;
598                 if (imp->imp_server_timeout)
599                         timeout = obd_timeout / 2;
600                 else
601                         timeout = obd_timeout;
602                 timeout = MAX(timeout * HZ, 1);
603                 lwi = LWI_TIMEOUT_INTR(obd_timeout, back_to_sleep, NULL, NULL);
604                 rc = l_wait_event(imp->imp_recovery_waitq, 
605                                   !ptlrpc_import_in_recovery(imp), &lwi);
606
607         }
608
609         spin_lock_irqsave(&imp->imp_lock, flags);
610         if (imp->imp_state != LUSTRE_IMP_FULL) {
611                 GOTO(out, 0);
612         }
613         spin_unlock_irqrestore(&imp->imp_lock, flags);
614
615         request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
616         if (request) {
617                 /* For non-replayable connections, don't attempt
618                    reconnect if this fails */
619                 if (!imp->imp_replayable) {
620                         request->rq_no_resend = 1;
621                         IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
622                         request->rq_send_state =  LUSTRE_IMP_CONNECTING;
623                 }
624                 request->rq_replen = lustre_msg_size(0, NULL);
625                 rc = ptlrpc_queue_wait(request);
626                 ptlrpc_req_finished(request);
627         }
628
629         spin_lock_irqsave(&imp->imp_lock, flags);
630 out:
631         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
632         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
633         imp->imp_conn_cnt = 0;
634         spin_unlock_irqrestore(&imp->imp_lock, flags);
635
636         RETURN(rc);
637 }
638