Whamcloud - gitweb
b=15221
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Mike Shaver <shaver@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  */
25
26 #define DEBUG_SUBSYSTEM S_RPC
27 #ifndef __KERNEL__
28 # include <liblustre.h>
29 #endif
30
31 #include <obd_support.h>
32 #include <lustre_ha.h>
33 #include <lustre_net.h>
34 #include <lustre_import.h>
35 #include <lustre_export.h>
36 #include <obd.h>
37 #include <obd_class.h>
38
39 #include "ptlrpc_internal.h"
40
41 struct ptlrpc_connect_async_args {
42          __u64 pcaa_peer_committed;
43         int pcaa_initial_connect;
44 };
45
46 /* A CLOSED import should remain so. */
47 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
48 do {                                                                           \
49         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
50                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
51                       imp, obd2cli_tgt(imp->imp_obd),                          \
52                       ptlrpc_import_state_name(imp->imp_state),                \
53                       ptlrpc_import_state_name(state));                        \
54                imp->imp_state = state;                                         \
55         }                                                                      \
56 } while(0)
57
58 #define IMPORT_SET_STATE(imp, state)            \
59 do {                                            \
60         spin_lock(&imp->imp_lock);              \
61         IMPORT_SET_STATE_NOLOCK(imp, state);    \
62         spin_unlock(&imp->imp_lock);            \
63 } while(0)
64
65
66 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
67                                     void * data, int rc);
68 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
69
70 /* Only this function is allowed to change the import state when it is
71  * CLOSED. I would rather refcount the import and free it after
72  * disconnection like we do with exports. To do that, the client_obd
73  * will need to save the peer info somewhere other than in the import,
74  * though. */
75 int ptlrpc_init_import(struct obd_import *imp)
76 {
77         spin_lock(&imp->imp_lock);
78
79         imp->imp_generation++;
80         imp->imp_state =  LUSTRE_IMP_NEW;
81
82         spin_unlock(&imp->imp_lock);
83
84         return 0;
85 }
86 EXPORT_SYMBOL(ptlrpc_init_import);
87
88 #define UUID_STR "_UUID"
89 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
90                       int *uuid_len)
91 {
92         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
93                 ? uuid : uuid + strlen(prefix);
94
95         *uuid_len = strlen(*uuid_start);
96
97         if (*uuid_len < strlen(UUID_STR))
98                 return;
99
100         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
101                     UUID_STR, strlen(UUID_STR)))
102                 *uuid_len -= strlen(UUID_STR);
103 }
104
105 /* Returns true if import was FULL, false if import was already not
106  * connected.
107  * @imp - import to be disconnected
108  * @conn_cnt - connection count (epoch) of the request that timed out
109  *             and caused the disconnection.  In some cases, multiple
110  *             inflight requests can fail to a single target (e.g. OST
111  *             bulk requests) and if one has already caused a reconnection
112  *             (increasing the import->conn_cnt) the older failure should
113  *             not also cause a reconnection.  If zero it forces a reconnect.
114  */
115 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
116 {
117         int rc = 0;
118
119         spin_lock(&imp->imp_lock);
120
121         if (imp->imp_state == LUSTRE_IMP_FULL &&
122             (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
123                 char *target_start;
124                 int   target_len;
125
126                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
127                           &target_start, &target_len);
128                 if (imp->imp_replayable) {
129                         LCONSOLE_WARN("%s: Connection to service %.*s via nid "
130                                "%s was lost; in progress operations using this "
131                                "service will wait for recovery to complete.\n",
132                                imp->imp_obd->obd_name, target_len, target_start,
133                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
134                 } else {
135                         LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
136                                "%.*s via nid %s was lost; in progress "
137                                "operations using this service will fail.\n",
138                                imp->imp_obd->obd_name, target_len, target_start, 
139                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
140                 }
141                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
142                 spin_unlock(&imp->imp_lock);
143     
144                 if (obd_dump_on_timeout)
145                         libcfs_debug_dumplog();
146
147                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
148                 rc = 1;
149         } else {
150                 spin_unlock(&imp->imp_lock);
151                 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
152                        imp->imp_client->cli_name, imp,
153                        (imp->imp_state == LUSTRE_IMP_FULL &&
154                         imp->imp_conn_cnt > conn_cnt) ?
155                        "reconnected" : "not connected", imp->imp_conn_cnt,
156                        conn_cnt, ptlrpc_import_state_name(imp->imp_state));
157         }
158
159         return rc;
160 }
161
162 /* Must be called with imp_lock held! */
163 static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
164 {
165         ENTRY;
166         LASSERT_SPIN_LOCKED(&imp->imp_lock);
167
168         CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
169         imp->imp_invalid = 1;
170         imp->imp_generation++;
171         spin_unlock(&imp->imp_lock);
172
173         ptlrpc_abort_inflight(imp);
174         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
175 }
176
177 /*
178  * This acts as a barrier; all existing requests are rejected, and
179  * no new requests will be accepted until the import is valid again.
180  */
181 void ptlrpc_deactivate_import(struct obd_import *imp)
182 {
183         spin_lock(&imp->imp_lock);
184         ptlrpc_deactivate_and_unlock_import(imp);
185 }
186
187 /*
188  * This function will invalidate the import, if necessary, then block
189  * for all the RPC completions, and finally notify the obd to
190  * invalidate its state (ie cancel locks, clear pending requests,
191  * etc).
192  */
193 void ptlrpc_invalidate_import(struct obd_import *imp)
194 {
195         struct list_head *tmp, *n;
196         struct ptlrpc_request *req;
197         struct l_wait_info lwi;
198         int rc;
199
200         atomic_inc(&imp->imp_inval_count);
201
202         if (!imp->imp_invalid)
203                 ptlrpc_deactivate_import(imp);
204
205         LASSERT(imp->imp_invalid);
206
207         /* wait for all requests to error out and call completion callbacks.
208            Cap it at obd_timeout -- these should all have been locally
209            cancelled by ptlrpc_abort_inflight. */
210         lwi = LWI_TIMEOUT_INTERVAL(
211                 cfs_timeout_cap(cfs_time_seconds(obd_timeout)),
212                 cfs_time_seconds(1), NULL, NULL);
213         rc = l_wait_event(imp->imp_recovery_waitq,
214                           (atomic_read(&imp->imp_inflight) == 0), &lwi);
215
216         if (rc) {
217                 CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
218                        obd2cli_tgt(imp->imp_obd), rc,
219                        atomic_read(&imp->imp_inflight));
220                 spin_lock(&imp->imp_lock);
221                 list_for_each_safe(tmp, n, &imp->imp_sending_list) {
222                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
223                         DEBUG_REQ(D_ERROR, req, "still on sending list");
224                 }
225                 list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
226                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
227                         DEBUG_REQ(D_ERROR, req, "still on delayed list");
228                 }
229                 spin_unlock(&imp->imp_lock);
230         }
231
232         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
233
234         atomic_dec(&imp->imp_inval_count);
235         cfs_waitq_signal(&imp->imp_recovery_waitq);
236 }
237
238 /* unset imp_invalid */
239 void ptlrpc_activate_import(struct obd_import *imp)
240 {
241         struct obd_device *obd = imp->imp_obd;
242
243         spin_lock(&imp->imp_lock);
244         imp->imp_invalid = 0;
245         spin_unlock(&imp->imp_lock);
246
247         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
248 }
249
250 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
251 {
252         ENTRY;
253
254         LASSERT(!imp->imp_dlm_fake);
255
256         if (ptlrpc_set_import_discon(imp, conn_cnt)) {
257                 if (!imp->imp_replayable) {
258                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
259                                "auto-deactivating\n",
260                                obd2cli_tgt(imp->imp_obd),
261                                imp->imp_connection->c_remote_uuid.uuid,
262                                imp->imp_obd->obd_name);
263                         ptlrpc_deactivate_import(imp);
264                 }
265
266                 CDEBUG(D_HA, "%s: waking up pinger\n",
267                        obd2cli_tgt(imp->imp_obd));
268
269                 spin_lock(&imp->imp_lock);
270                 imp->imp_force_verify = 1;
271                 spin_unlock(&imp->imp_lock);
272
273                 ptlrpc_pinger_wake_up();
274         }
275         EXIT;
276 }
277
278 int ptlrpc_reconnect_import(struct obd_import *imp)
279 {
280         
281         ptlrpc_set_import_discon(imp, 0); 
282         /* Force a new connect attempt */
283         ptlrpc_invalidate_import(imp);
284         /* Do a fresh connect next time by zeroing the handle */
285         ptlrpc_disconnect_import(imp, 1);
286         /* Wait for all invalidate calls to finish */
287         if (atomic_read(&imp->imp_inval_count) > 0) {
288                 int rc;
289                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
290                 rc = l_wait_event(imp->imp_recovery_waitq,
291                                   (atomic_read(&imp->imp_inval_count) == 0),
292                                   &lwi);
293                 if (rc)
294                         CERROR("Interrupted, inval=%d\n", 
295                                atomic_read(&imp->imp_inval_count));
296         }
297
298         /* 
299          * Allow reconnect attempts. Note: Currently, the function is
300          * only called by MGC. So assume this is a recoverable import,
301          * and force import to be recoverable. fix this if you need to 
302          */
303         
304         imp->imp_obd->obd_no_recov = 0;
305         /* Remove 'invalid' flag */
306         ptlrpc_activate_import(imp);
307         /* Attempt a new connect */
308         ptlrpc_recover_import(imp, NULL);
309         return 0;
310 }
311
312 EXPORT_SYMBOL(ptlrpc_reconnect_import);
313
314 static int import_select_connection(struct obd_import *imp)
315 {
316         struct obd_import_conn *imp_conn = NULL, *conn;
317         struct obd_export *dlmexp;
318         int tried_all = 1;
319         ENTRY;
320
321         spin_lock(&imp->imp_lock);
322
323         if (list_empty(&imp->imp_conn_list)) {
324                 CERROR("%s: no connections available\n",
325                         imp->imp_obd->obd_name);
326                 spin_unlock(&imp->imp_lock);
327                 RETURN(-EINVAL);
328         }
329
330         list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
331                 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
332                        imp->imp_obd->obd_name,
333                        libcfs_nid2str(conn->oic_conn->c_peer.nid),
334                        conn->oic_last_attempt);
335                 
336                 /* Don't thrash connections */
337                 if (cfs_time_before_64(cfs_time_current_64(),
338                                      conn->oic_last_attempt + 
339                                      cfs_time_seconds(CONNECTION_SWITCH_MIN))) {
340                         continue;
341                 }
342
343                 /* If we have not tried this connection since the
344                    the last successful attempt, go with this one */
345                 if ((conn->oic_last_attempt == 0) ||
346                     cfs_time_beforeq_64(conn->oic_last_attempt,
347                                        imp->imp_last_success_conn)) {
348                         imp_conn = conn;
349                         tried_all = 0;
350                         break;
351                 }
352
353                 /* If all of the connections have already been tried
354                    since the last successful connection; just choose the
355                    least recently used */
356                 if (!imp_conn)
357                         imp_conn = conn;
358                 else if (cfs_time_before_64(conn->oic_last_attempt,
359                                             imp_conn->oic_last_attempt))
360                         imp_conn = conn;
361         }
362
363         /* if not found, simply choose the current one */
364         if (!imp_conn) {
365                 LASSERT(imp->imp_conn_current);
366                 imp_conn = imp->imp_conn_current;
367                 tried_all = 0;
368         }
369         LASSERT(imp_conn->oic_conn);
370
371         /* If we've tried everything, and we're back to the beginning of the
372            list, increase our timeout and try again. It will be reset when
373            we do finally connect. (FIXME: really we should wait for all network
374            state associated with the last connection attempt to drain before
375            trying to reconnect on it.) */
376         if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item) &&
377             !imp->imp_recon_bk /* not retrying */) {
378                 if (at_get(&imp->imp_at.iat_net_latency) <
379                     CONNECTION_SWITCH_MAX) {
380                         at_add(&imp->imp_at.iat_net_latency,
381                                at_get(&imp->imp_at.iat_net_latency) +
382                                CONNECTION_SWITCH_INC);
383                 }
384                 LASSERT(imp_conn->oic_last_attempt);
385                 CWARN("%s: tried all connections, increasing latency to %ds\n",
386                       imp->imp_obd->obd_name,
387                       at_get(&imp->imp_at.iat_net_latency));
388         }
389
390         imp_conn->oic_last_attempt = cfs_time_current_64();
391
392         /* switch connection, don't mind if it's same as the current one */
393         if (imp->imp_connection)
394                 ptlrpc_put_connection(imp->imp_connection);
395         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
396
397         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
398         LASSERT(dlmexp != NULL);
399         if (dlmexp->exp_connection)
400                 ptlrpc_put_connection(dlmexp->exp_connection);
401         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
402         class_export_put(dlmexp);
403
404         if (imp->imp_conn_current != imp_conn) {
405                 if (imp->imp_conn_current)
406                         LCONSOLE_INFO("Changing connection for %s to %s/%s\n",
407                                       imp->imp_obd->obd_name,
408                                       imp_conn->oic_uuid.uuid,
409                                       libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
410                 imp->imp_conn_current = imp_conn;
411         }
412
413         CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
414                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
415                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
416
417         spin_unlock(&imp->imp_lock);
418
419         RETURN(0);
420 }
421
422 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
423 {
424         struct obd_device *obd = imp->imp_obd;
425         int initial_connect = 0;
426         int rc;
427         __u64 committed_before_reconnect = 0;
428         struct ptlrpc_request *request;
429         int size[] = { sizeof(struct ptlrpc_body),
430                        sizeof(imp->imp_obd->u.cli.cl_target_uuid),
431                        sizeof(obd->obd_uuid),
432                        sizeof(imp->imp_dlm_handle),
433                        sizeof(imp->imp_connect_data) };
434         char *tmp[] = { NULL,
435                         obd2cli_tgt(imp->imp_obd),
436                         obd->obd_uuid.uuid,
437                         (char *)&imp->imp_dlm_handle,
438                         (char *)&imp->imp_connect_data };
439         struct ptlrpc_connect_async_args *aa;
440
441         ENTRY;
442         spin_lock(&imp->imp_lock);
443         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
444                 spin_unlock(&imp->imp_lock);
445                 CERROR("can't connect to a closed import\n");
446                 RETURN(-EINVAL);
447         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
448                 spin_unlock(&imp->imp_lock);
449                 CERROR("already connected\n");
450                 RETURN(0);
451         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
452                 spin_unlock(&imp->imp_lock);
453                 CERROR("already connecting\n");
454                 RETURN(-EALREADY);
455         }
456
457         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
458
459         imp->imp_conn_cnt++;
460         imp->imp_resend_replay = 0;
461
462         if (!lustre_handle_is_used(&imp->imp_remote_handle))
463                 initial_connect = 1;
464         else
465                 committed_before_reconnect = imp->imp_peer_committed_transno;
466
467         spin_unlock(&imp->imp_lock);
468
469         if (new_uuid) {
470                 struct obd_uuid uuid;
471
472                 obd_str2uuid(&uuid, new_uuid);
473                 rc = import_set_conn_priority(imp, &uuid);
474                 if (rc)
475                         GOTO(out, rc);
476         }
477
478         rc = import_select_connection(imp);
479         if (rc)
480                 GOTO(out, rc);
481
482         /* last in connection list */
483         if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
484                 if (imp->imp_initial_recov_bk && initial_connect) {
485                         CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
486                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
487                         /* Don't retry if connect fails */
488                         rc = 0;
489                         obd_set_info_async(obd->obd_self_export,
490                                            strlen(KEY_INIT_RECOV),
491                                            KEY_INIT_RECOV,
492                                            sizeof(rc), &rc, NULL);
493                 }
494                 if (imp->imp_recon_bk) {
495                         CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
496                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
497                         spin_lock(&imp->imp_lock);
498                         imp->imp_last_recon = 1;
499                         spin_unlock(&imp->imp_lock);
500                 }
501         }
502
503         /* Reset connect flags to the originally requested flags, in case
504          * the server is updated on-the-fly we will get the new features. */
505         imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
506         rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
507                            &obd->obd_uuid, &imp->imp_connect_data);
508         if (rc)
509                 GOTO(out, rc);
510
511         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
512                                   5, size, tmp);
513         if (!request)
514                 GOTO(out, rc = -ENOMEM);
515
516 #ifndef __KERNEL__
517         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
518 #endif
519         if (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V1)
520                 lustre_msg_add_op_flags(request->rq_reqmsg,
521                                         MSG_CONNECT_NEXT_VER);
522
523         request->rq_send_state = LUSTRE_IMP_CONNECTING;
524         /* Allow a slightly larger reply for future growth compatibility */
525         size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
526                               16 * sizeof(__u64);
527         ptlrpc_req_set_repsize(request, 2, size);
528         request->rq_interpret_reply = ptlrpc_connect_interpret;
529
530         CLASSERT(sizeof (*aa) <= sizeof (request->rq_async_args));
531         aa = (struct ptlrpc_connect_async_args *)&request->rq_async_args;
532         memset(aa, 0, sizeof *aa);
533
534         aa->pcaa_peer_committed = committed_before_reconnect;
535         aa->pcaa_initial_connect = initial_connect;
536         if (aa->pcaa_initial_connect) {
537                 spin_lock(&imp->imp_lock);
538                 imp->imp_replayable = 1;
539                 spin_unlock(&imp->imp_lock);
540                 if (AT_OFF)
541                         /* AT will use INITIAL_CONNECT_TIMEOUT the first
542                            time, adaptive after that. */
543                         request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
544         }
545
546         DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
547                   aa->pcaa_initial_connect ? "initial " : "re", 
548                   imp->imp_conn_cnt);
549         ptlrpcd_add_req(request);
550         rc = 0;
551 out:
552         if (rc != 0) {
553                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
554         }
555
556         RETURN(rc);
557 }
558 EXPORT_SYMBOL(ptlrpc_connect_import);
559
560 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
561 {
562 #ifdef __KERNEL__
563         struct obd_import_conn *imp_conn;
564 #endif
565         int wake_pinger = 0;
566
567         ENTRY;
568
569         spin_lock(&imp->imp_lock);
570         if (list_empty(&imp->imp_conn_list))
571                 GOTO(unlock, 0);
572
573 #ifdef __KERNEL__
574         imp_conn = list_entry(imp->imp_conn_list.prev,
575                               struct obd_import_conn,
576                               oic_item);
577
578         /* XXX: When the failover node is the primary node, it is possible
579          * to have two identical connections in imp_conn_list. We must 
580          * compare not conn's pointers but NIDs, otherwise we can defeat
581          * connection throttling. (See bug 14774.) */
582         if (imp->imp_conn_current->oic_conn->c_self != 
583                                 imp_conn->oic_conn->c_self) {
584                 ptlrpc_ping_import_soon(imp);
585                 wake_pinger = 1;
586         }
587
588 #else
589         /* liblustre has no pinger thead, so we wakup pinger anyway */
590         wake_pinger = 1;
591 #endif 
592  unlock:
593         spin_unlock(&imp->imp_lock);
594
595         if (wake_pinger)
596                 ptlrpc_pinger_wake_up();
597
598         EXIT;
599 }
600
601 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
602                                     void * data, int rc)
603 {
604         struct ptlrpc_connect_async_args *aa = data;
605         struct obd_import *imp = request->rq_import;
606         struct client_obd *cli = &imp->imp_obd->u.cli;
607         struct lustre_handle old_hdl;
608         int msg_flags;
609         ENTRY;
610
611         spin_lock(&imp->imp_lock);
612         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
613                 spin_unlock(&imp->imp_lock);
614                 RETURN(0);
615         }
616         spin_unlock(&imp->imp_lock);
617
618         if (rc)
619                 GOTO(out, rc);
620
621         LASSERT(imp->imp_conn_current);
622
623         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
624
625         /* All imports are pingable */
626         spin_lock(&imp->imp_lock);
627         imp->imp_pingable = 1;
628
629         if (aa->pcaa_initial_connect) {
630                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
631                         imp->imp_replayable = 1;
632                         spin_unlock(&imp->imp_lock);
633                         CDEBUG(D_HA, "connected to replayable target: %s\n",
634                                obd2cli_tgt(imp->imp_obd));
635                 } else {
636                         imp->imp_replayable = 0;
637                         spin_unlock(&imp->imp_lock);
638                 }
639
640                 if ((request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V1 &&
641                      msg_flags & MSG_CONNECT_NEXT_VER) ||
642                     request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2) {
643                         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
644                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
645                                obd2cli_tgt(imp->imp_obd));
646                 } else {
647                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
648                                obd2cli_tgt(imp->imp_obd));
649                 }
650
651                 imp->imp_remote_handle =
652                                 *lustre_msg_get_handle(request->rq_repmsg);
653
654                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
655                 ptlrpc_activate_import(imp);
656                 GOTO(finish, rc = 0);
657         } else {
658                 spin_unlock(&imp->imp_lock);
659         }
660
661         /* Determine what recovery state to move the import to. */
662         if (MSG_CONNECT_RECONNECT & msg_flags) {
663                 memset(&old_hdl, 0, sizeof(old_hdl));
664                 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
665                             sizeof (old_hdl))) {
666                         CERROR("%s@%s didn't like our handle "LPX64
667                                ", failed\n", obd2cli_tgt(imp->imp_obd),
668                                imp->imp_connection->c_remote_uuid.uuid,
669                                imp->imp_dlm_handle.cookie);
670                         GOTO(out, rc = -ENOTCONN);
671                 }
672
673                 if (memcmp(&imp->imp_remote_handle,
674                            lustre_msg_get_handle(request->rq_repmsg),
675                            sizeof(imp->imp_remote_handle))) {
676
677                         CWARN("%s@%s changed server handle from "
678                                LPX64" to "LPX64" - evicting.\n",
679                                obd2cli_tgt(imp->imp_obd),
680                                imp->imp_connection->c_remote_uuid.uuid,
681                                imp->imp_remote_handle.cookie,
682                                lustre_msg_get_handle(request->rq_repmsg)->
683                                          cookie);
684                         imp->imp_remote_handle =
685                                      *lustre_msg_get_handle(request->rq_repmsg);
686
687                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
688                         GOTO(finish, rc = 0);
689                 } else {
690                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
691                                obd2cli_tgt(imp->imp_obd),
692                                imp->imp_connection->c_remote_uuid.uuid);
693                 }
694
695                 if (imp->imp_invalid) {
696                         CDEBUG(D_HA, "%s: reconnected but import is invalid; "
697                                "marking evicted\n", imp->imp_obd->obd_name);
698                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
699                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
700                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
701                                imp->imp_obd->obd_name,
702                                obd2cli_tgt(imp->imp_obd));
703
704                         spin_lock(&imp->imp_lock);
705                         imp->imp_resend_replay = 1;
706                         spin_unlock(&imp->imp_lock);
707
708                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
709                 } else {
710                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
711                 }
712         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
713                 LASSERT(imp->imp_replayable);
714                 imp->imp_remote_handle =
715                                 *lustre_msg_get_handle(request->rq_repmsg);
716                 imp->imp_last_replay_transno = 0;
717                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
718         } else {
719                 DEBUG_REQ(D_HA, request, "evicting (not initial connect and "
720                           "flags reconnect/recovering not set: %x)",msg_flags);
721                 imp->imp_remote_handle =
722                                 *lustre_msg_get_handle(request->rq_repmsg);
723                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
724         }
725
726         /* Sanity checks for a reconnected import. */
727         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
728                 CERROR("imp_replayable flag does not match server "
729                        "after reconnect. We should LBUG right here.\n");
730         }
731
732         if (lustre_msg_get_last_committed(request->rq_repmsg) <
733             aa->pcaa_peer_committed) {
734                 CERROR("%s went back in time (transno "LPD64
735                        " was previously committed, server now claims "LPD64
736                        ")!  See https://bugzilla.clusterfs.com/"
737                        "long_list.cgi?buglist=9646\n",
738                        obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
739                        lustre_msg_get_last_committed(request->rq_repmsg));
740         }
741
742 finish:
743         rc = ptlrpc_import_recovery_state_machine(imp);
744         if (rc != 0) {
745                 if (rc == -ENOTCONN) {
746                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
747                                "invalidating and reconnecting\n",
748                                obd2cli_tgt(imp->imp_obd),
749                                imp->imp_connection->c_remote_uuid.uuid);
750                         ptlrpc_connect_import(imp, NULL);
751                         RETURN(0);
752                 }
753         } else {
754                 struct obd_connect_data *ocd;
755                 struct obd_export *exp;
756
757                 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
758                                          lustre_swab_connect);
759                 spin_lock(&imp->imp_lock);
760                 list_del(&imp->imp_conn_current->oic_item);
761                 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
762                 imp->imp_last_success_conn =
763                         imp->imp_conn_current->oic_last_attempt;
764
765                 if (ocd == NULL) {
766                         spin_unlock(&imp->imp_lock);
767                         CERROR("Wrong connect data from server\n");
768                         rc = -EPROTO;
769                         GOTO(out, rc);
770                 }
771
772                 imp->imp_connect_data = *ocd;
773
774                 exp = class_conn2export(&imp->imp_dlm_handle);
775                 spin_unlock(&imp->imp_lock);
776
777                 /* check that server granted subset of flags we asked for. */
778                 LASSERTF((ocd->ocd_connect_flags &
779                           imp->imp_connect_flags_orig) ==
780                          ocd->ocd_connect_flags, LPX64" != "LPX64,
781                          imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
782
783                 if (!exp) {
784                         /* This could happen if export is cleaned during the 
785                            connect attempt */
786                         CERROR("Missing export for %s\n", 
787                                imp->imp_obd->obd_name);
788                         GOTO(out, rc = -ENODEV);
789                 }
790                 exp->exp_connect_flags = ocd->ocd_connect_flags;
791                 imp->imp_obd->obd_self_export->exp_connect_flags = ocd->ocd_connect_flags;
792                 class_export_put(exp);
793
794                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
795
796                 if (!ocd->ocd_ibits_known &&
797                     ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
798                         CERROR("Inodebits aware server returned zero compatible"
799                                " bits?\n");
800
801                 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
802                     (ocd->ocd_version > LUSTRE_VERSION_CODE +
803                                         LUSTRE_VERSION_OFFSET_WARN ||
804                      ocd->ocd_version < LUSTRE_VERSION_CODE -
805                                         LUSTRE_VERSION_OFFSET_WARN)) {
806                         /* Sigh, some compilers do not like #ifdef in the middle
807                            of macro arguments */
808 #ifdef __KERNEL__
809                         const char *older =
810                                 "older.  Consider upgrading this client";
811 #else
812                         const char *older =
813                                 "older.  Consider recompiling this application";
814 #endif
815                         const char *newer = "newer than client version";
816
817                         LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
818                                       "is much %s (%s)\n",
819                                       obd2cli_tgt(imp->imp_obd),
820                                       OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
821                                       OBD_OCD_VERSION_MINOR(ocd->ocd_version),
822                                       OBD_OCD_VERSION_PATCH(ocd->ocd_version),
823                                       OBD_OCD_VERSION_FIX(ocd->ocd_version),
824                                       ocd->ocd_version > LUSTRE_VERSION_CODE ?
825                                       newer : older, LUSTRE_VERSION_STRING);
826                 }
827
828                 if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
829                         /* We sent to the server ocd_cksum_types with bits set
830                          * for algorithms we understand. The server masked off
831                          * the checksum types it doesn't support */
832                         if ((ocd->ocd_cksum_types & OBD_CKSUM_ALL) == 0) {
833                                 LCONSOLE_WARN("The negotiation of the checksum "
834                                               "alogrithm to use with server %s "
835                                               "failed (%x/%x), disabling "
836                                               "checksums\n",
837                                               obd2cli_tgt(imp->imp_obd),
838                                               ocd->ocd_cksum_types,
839                                               OBD_CKSUM_ALL);
840                                 cli->cl_checksum = 0;
841                                 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
842                                 cli->cl_cksum_type = OBD_CKSUM_CRC32;
843                         } else {
844                                 cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
845
846                                 if (ocd->ocd_cksum_types & OSC_DEFAULT_CKSUM)
847                                         cli->cl_cksum_type = OSC_DEFAULT_CKSUM;
848                                 else if (ocd->ocd_cksum_types & OBD_CKSUM_ADLER)
849                                         cli->cl_cksum_type = OBD_CKSUM_ADLER;
850                                 else
851                                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
852                         }
853                 } else {
854                         /* The server does not support OBD_CONNECT_CKSUM.
855                          * Enforce CRC32 for backward compatibility*/
856                         cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
857                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
858                 }
859
860                 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
861                         cli->cl_max_pages_per_rpc = 
862                                 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
863                 }
864
865                 imp->imp_obd->obd_namespace->ns_connect_flags = 
866                         ocd->ocd_connect_flags;
867                 imp->imp_obd->obd_namespace->ns_orig_connect_flags = 
868                         ocd->ocd_connect_flags;
869
870                 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
871                     (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
872                         /* We need a per-message support flag, because 
873                            a. we don't know if the incoming connect reply
874                               supports AT or not (in reply_in_callback)
875                               until we unpack it.
876                            b. failovered server means export and flags are gone
877                               (in ptlrpc_send_reply).
878                            Can only be set when we know AT is supported at 
879                            both ends */
880                         imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
881                 else
882                         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
883
884                 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
885                         (cli->cl_max_pages_per_rpc > 0));
886         }
887
888  out:
889         if (rc != 0) {
890                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
891                 spin_lock(&imp->imp_lock);
892                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov &&
893                     (request->rq_import_generation == imp->imp_generation))
894                         ptlrpc_deactivate_and_unlock_import(imp);
895                 else
896                         spin_unlock(&imp->imp_lock);
897
898                 if (imp->imp_recon_bk && imp->imp_last_recon) {
899                         /* Give up trying to reconnect */
900                         imp->imp_obd->obd_no_recov = 1;
901                         ptlrpc_deactivate_import(imp);
902                 }
903
904                 if (rc == -EPROTO) {
905                         struct obd_connect_data *ocd;
906                         ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
907                                                  sizeof *ocd,
908                                                  lustre_swab_connect);
909                         if (ocd &&
910                             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
911                             (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
912                            /* Actually servers are only supposed to refuse
913                               connection from liblustre clients, so we should
914                               never see this from VFS context */
915                                 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
916                                         "(%d.%d.%d.%d)"
917                                         " refused connection from this client "
918                                         "with an incompatible version (%s).  "
919                                         "Client must be recompiled\n",
920                                         obd2cli_tgt(imp->imp_obd),
921                                         OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
922                                         OBD_OCD_VERSION_MINOR(ocd->ocd_version),
923                                         OBD_OCD_VERSION_PATCH(ocd->ocd_version),
924                                         OBD_OCD_VERSION_FIX(ocd->ocd_version),
925                                         LUSTRE_VERSION_STRING);
926                                 ptlrpc_deactivate_import(imp);
927                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
928                         }
929                         RETURN(-EPROTO);
930                 }
931
932                 ptlrpc_maybe_ping_import_soon(imp);
933
934                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
935                        obd2cli_tgt(imp->imp_obd),
936                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
937         }
938         
939         spin_lock(&imp->imp_lock);
940         imp->imp_last_recon = 0;
941         spin_unlock(&imp->imp_lock);
942
943         cfs_waitq_signal(&imp->imp_recovery_waitq);
944         RETURN(rc);
945 }
946
947 static int completed_replay_interpret(struct ptlrpc_request *req,
948                                     void * data, int rc)
949 {
950         ENTRY;
951         atomic_dec(&req->rq_import->imp_replay_inflight);
952         if (req->rq_status == 0) {
953                 ptlrpc_import_recovery_state_machine(req->rq_import);
954         } else {
955                 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
956                        "reconnecting\n",
957                        req->rq_import->imp_obd->obd_name, req->rq_status);
958                 ptlrpc_connect_import(req->rq_import, NULL);
959         }
960
961         RETURN(0);
962 }
963
964 static int signal_completed_replay(struct obd_import *imp)
965 {
966         struct ptlrpc_request *req;
967         ENTRY;
968
969         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
970         atomic_inc(&imp->imp_replay_inflight);
971
972         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
973         if (!req) {
974                 atomic_dec(&imp->imp_replay_inflight);
975                 RETURN(-ENOMEM);
976         }
977
978         ptlrpc_req_set_repsize(req, 1, NULL);
979         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
980         lustre_msg_add_flags(req->rq_reqmsg, MSG_LAST_REPLAY);
981         req->rq_timeout *= 3;
982         req->rq_interpret_reply = completed_replay_interpret;
983
984         ptlrpcd_add_req(req);
985         RETURN(0);
986 }
987
988 #ifdef __KERNEL__
989 static int ptlrpc_invalidate_import_thread(void *data)
990 {
991         struct obd_import *imp = data;
992
993         ENTRY;
994
995         ptlrpc_daemonize("ll_imp_inval");
996         
997         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
998                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
999                imp->imp_connection->c_remote_uuid.uuid);
1000
1001         ptlrpc_invalidate_import(imp);
1002
1003         if (obd_dump_on_eviction) {
1004                 CERROR("dump the log upon eviction\n");
1005                 libcfs_debug_dumplog();
1006         }
1007
1008         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1009         ptlrpc_import_recovery_state_machine(imp);
1010
1011         RETURN(0);
1012 }
1013 #endif
1014
1015 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
1016 {
1017         int rc = 0;
1018         int inflight;
1019         char *target_start;
1020         int target_len;
1021
1022         ENTRY;
1023         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
1024                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1025                           &target_start, &target_len);
1026                 /* Don't care about MGC eviction */
1027                 if (strcmp(imp->imp_obd->obd_type->typ_name,
1028                            LUSTRE_MGC_NAME) != 0) {
1029                         LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
1030                                            "%.*s; in progress operations using "
1031                                            "this service will fail.\n",
1032                                            target_len, target_start);
1033                 }
1034                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
1035                        obd2cli_tgt(imp->imp_obd),
1036                        imp->imp_connection->c_remote_uuid.uuid);
1037
1038 #ifdef __KERNEL__
1039                 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
1040                                    CLONE_VM | CLONE_FILES);
1041                 if (rc < 0)
1042                         CERROR("error starting invalidate thread: %d\n", rc);
1043                 else
1044                         rc = 0;
1045                 RETURN(rc);
1046 #else
1047                 ptlrpc_invalidate_import(imp);
1048
1049                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1050 #endif
1051         }
1052
1053         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1054                 CDEBUG(D_HA, "replay requested by %s\n",
1055                        obd2cli_tgt(imp->imp_obd));
1056                 rc = ptlrpc_replay_next(imp, &inflight);
1057                 if (inflight == 0 &&
1058                     atomic_read(&imp->imp_replay_inflight) == 0) {
1059                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
1060                         rc = ldlm_replay_locks(imp);
1061                         if (rc)
1062                                 GOTO(out, rc);
1063                 }
1064                 rc = 0;
1065         }
1066
1067         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1068                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1069                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
1070                         rc = signal_completed_replay(imp);
1071                         if (rc)
1072                                 GOTO(out, rc);
1073                 }
1074
1075         }
1076
1077         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1078                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1079                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1080                 }
1081         }
1082
1083         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1084                 CDEBUG(D_HA, "reconnected to %s@%s\n",
1085                        obd2cli_tgt(imp->imp_obd),
1086                        imp->imp_connection->c_remote_uuid.uuid);
1087
1088                 rc = ptlrpc_resend(imp);
1089                 if (rc)
1090                         GOTO(out, rc);
1091                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1092                 ptlrpc_activate_import(imp);
1093
1094                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1095                           &target_start, &target_len);
1096                 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1097                               "using nid %s.\n", imp->imp_obd->obd_name,
1098                               target_len, target_start,
1099                               libcfs_nid2str(imp->imp_connection->c_peer.nid));
1100         }
1101
1102         if (imp->imp_state == LUSTRE_IMP_FULL) {
1103                 cfs_waitq_signal(&imp->imp_recovery_waitq);
1104                 ptlrpc_wake_delayed(imp);
1105         }
1106
1107  out:
1108         RETURN(rc);
1109 }
1110
1111 static int back_to_sleep(void *unused)
1112 {
1113         return 0;
1114 }
1115
1116 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1117 {
1118         struct ptlrpc_request *req;
1119         int rq_opc, rc = 0;
1120         int nowait = imp->imp_obd->obd_force;
1121         ENTRY;
1122
1123         if (nowait)
1124                 GOTO(set_state, rc);
1125
1126         switch (imp->imp_connect_op) {
1127         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1128         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1129         case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1130         default:
1131                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1132                        obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1133                 RETURN(-EINVAL);
1134         }
1135
1136         if (ptlrpc_import_in_recovery(imp)) {
1137                 struct l_wait_info lwi;
1138                 cfs_duration_t timeout;
1139
1140                 if (AT_OFF) {
1141                         timeout = cfs_time_seconds(obd_timeout);
1142                 } else {
1143                         int idx = import_at_get_index(imp, 
1144                                 imp->imp_client->cli_request_portal);
1145                         timeout = cfs_time_seconds(
1146                                 at_get(&imp->imp_at.iat_service_estimate[idx]));
1147                 }
1148                 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout), 
1149                                        back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1150                 rc = l_wait_event(imp->imp_recovery_waitq,
1151                                   !ptlrpc_import_in_recovery(imp), &lwi);
1152         }
1153
1154         spin_lock(&imp->imp_lock);
1155         if (imp->imp_state != LUSTRE_IMP_FULL)
1156                 GOTO(out, 0);
1157
1158         spin_unlock(&imp->imp_lock);
1159
1160         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1161         if (req) {
1162                 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1163                  * it fails.  We can get through the above with a down server
1164                  * if the client doesn't know the server is gone yet. */
1165                 req->rq_no_resend = 1;
1166                 
1167 #ifndef CRAY_XT3
1168                 /* We want client umounts to happen quickly, no matter the 
1169                    server state... */
1170                 req->rq_timeout = min_t(int, req->rq_timeout,
1171                                         INITIAL_CONNECT_TIMEOUT);
1172 #else
1173                 /* ... but we always want liblustre clients to nicely 
1174                    disconnect, so only use the adaptive value. */
1175                 if (AT_OFF)
1176                         req->rq_timeout = obd_timeout / 3;
1177 #endif
1178
1179                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1180                 req->rq_send_state =  LUSTRE_IMP_CONNECTING;
1181                 ptlrpc_req_set_repsize(req, 1, NULL);
1182                 rc = ptlrpc_queue_wait(req);
1183                 ptlrpc_req_finished(req);
1184         }
1185
1186 set_state:
1187         spin_lock(&imp->imp_lock);
1188 out:
1189         if (noclose) 
1190                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1191         else
1192                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1193         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1194         /* Try all connections in the future - bz 12758 */ 
1195         imp->imp_last_recon = 0;
1196         spin_unlock(&imp->imp_lock);
1197
1198         RETURN(rc);
1199 }
1200
1201 /* Sets maximal number of RPCs possible originating from other side of this
1202    import (server) to us and number of async RPC replies that we are not waiting
1203    for arriving */
1204 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1205 {
1206         LNetSetAsync(imp->imp_connection->c_peer, count);
1207 }
1208
1209
1210 /* Adaptive Timeout utils */
1211 extern unsigned int at_min, at_max, at_history;
1212
1213 /* Bin into timeslices using AT_BINS bins.
1214    This gives us a max of the last binlimit*AT_BINS secs without the storage,
1215    but still smoothing out a return to normalcy from a slow response.
1216    (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1217 int at_add(struct adaptive_timeout *at, unsigned int val) 
1218 {
1219         unsigned int old = at->at_current;
1220         time_t now = cfs_time_current_sec();
1221         time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
1222
1223         LASSERT(at);
1224 #if 0
1225         CDEBUG(D_INFO, "add %u to %p time=%lu v=%u (%u %u %u %u)\n", 
1226                val, at, now - at->at_binstart, at->at_current,
1227                at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1228 #endif
1229         if (val == 0) 
1230                 /* 0's don't count, because we never want our timeout to 
1231                    drop to 0, and because 0 could mean an error */
1232                 return 0;
1233
1234         spin_lock(&at->at_lock);
1235
1236         if (unlikely(at->at_binstart == 0)) {
1237                 /* Special case to remove default from history */
1238                 at->at_current = val;
1239                 at->at_worst_ever = val;
1240                 at->at_worst_time = now;
1241                 at->at_hist[0] = val;
1242                 at->at_binstart = now;
1243         } else if (now - at->at_binstart < binlimit ) {
1244                 /* in bin 0 */
1245                 at->at_hist[0] = max(val, at->at_hist[0]);
1246                 at->at_current = max(val, at->at_current);
1247         } else {
1248                 int i, shift;
1249                 unsigned int maxv = val;
1250                 /* move bins over */
1251                 shift = (now - at->at_binstart) / binlimit;
1252                 LASSERT(shift > 0);
1253                 for(i = AT_BINS - 1; i >= 0; i--) {
1254                         if (i >= shift) {
1255                                 at->at_hist[i] = at->at_hist[i - shift];
1256                                 maxv = max(maxv, at->at_hist[i]);
1257                         } else {
1258                                 at->at_hist[i] = 0;
1259                         }
1260                 }
1261                 at->at_hist[0] = val;
1262                 at->at_current = maxv;
1263                 at->at_binstart += shift * binlimit;
1264         }
1265
1266         if (at->at_current > at->at_worst_ever) {
1267                 at->at_worst_ever = at->at_current;
1268                 at->at_worst_time = now;
1269         }
1270
1271         if (at->at_flags & AT_FLG_NOHIST)
1272                 /* Only keep last reported val; keeping the rest of the history
1273                    for proc only */
1274                 at->at_current = val;
1275
1276         if (at_max > 0)
1277                 at->at_current =  min(at->at_current, at_max);
1278         at->at_current =  max(at->at_current, at_min);
1279
1280 #if 0
1281         if (at->at_current != old)
1282                 CDEBUG(D_ADAPTTO, "AT %p change: old=%u new=%u delta=%d "
1283                        "(val=%u) hist %u %u %u %u\n", at,
1284                        old, at->at_current, at->at_current - old, val,
1285                        at->at_hist[0], at->at_hist[1], at->at_hist[2],
1286                        at->at_hist[3]);
1287 #endif
1288         
1289         /* if we changed, report the old value */
1290         old = (at->at_current != old) ? old : 0;
1291         
1292         spin_unlock(&at->at_lock);
1293         return old;
1294 }
1295
1296 /* Find the imp_at index for a given portal; assign if space available */
1297 int import_at_get_index(struct obd_import *imp, int portal) 
1298 {
1299         struct imp_at *at = &imp->imp_at;
1300         int i;
1301
1302         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1303                 if (at->iat_portal[i] == portal) 
1304                         return i;
1305                 if (at->iat_portal[i] == 0)
1306                         /* unused */
1307                         break;
1308         }
1309
1310         /* Not found in list, add it under a lock */
1311         spin_lock(&imp->imp_lock);
1312
1313         /* Check unused under lock */
1314         for (; i < IMP_AT_MAX_PORTALS; i++) {
1315                 if (at->iat_portal[i] == portal) 
1316                         goto out;
1317                 if (at->iat_portal[i] == 0)
1318                         /* unused */
1319                         break;
1320         }
1321         
1322         /* Not enough portals? */
1323         LASSERT(i < IMP_AT_MAX_PORTALS);
1324
1325         at->iat_portal[i] = portal;
1326 out:
1327         spin_unlock(&imp->imp_lock);
1328         return i;
1329 }
1330