Whamcloud - gitweb
Branch b1_6
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Mike Shaver <shaver@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  */
25
26 #define DEBUG_SUBSYSTEM S_RPC
27 #ifndef __KERNEL__
28 # include <liblustre.h>
29 #endif
30
31 #include <obd_support.h>
32 #include <lustre_ha.h>
33 #include <lustre_net.h>
34 #include <lustre_import.h>
35 #include <lustre_export.h>
36 #include <obd.h>
37 #include <obd_class.h>
38
39 #include "ptlrpc_internal.h"
40
41 struct ptlrpc_connect_async_args {
42          __u64 pcaa_peer_committed;
43         int pcaa_initial_connect;
44 };
45
46 /* A CLOSED import should remain so. */
47 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
48 do {                                                                           \
49         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
50                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
51                       imp, obd2cli_tgt(imp->imp_obd),                          \
52                       ptlrpc_import_state_name(imp->imp_state),                \
53                       ptlrpc_import_state_name(state));                        \
54                imp->imp_state = state;                                         \
55         }                                                                      \
56 } while(0)
57
58 #define IMPORT_SET_STATE(imp, state)            \
59 do {                                            \
60         spin_lock(&imp->imp_lock);              \
61         IMPORT_SET_STATE_NOLOCK(imp, state);    \
62         spin_unlock(&imp->imp_lock);            \
63 } while(0)
64
65
66 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
67                                     void * data, int rc);
68 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
69
70 /* Only this function is allowed to change the import state when it is
71  * CLOSED. I would rather refcount the import and free it after
72  * disconnection like we do with exports. To do that, the client_obd
73  * will need to save the peer info somewhere other than in the import,
74  * though. */
75 int ptlrpc_init_import(struct obd_import *imp)
76 {
77         spin_lock(&imp->imp_lock);
78
79         imp->imp_generation++;
80         imp->imp_state =  LUSTRE_IMP_NEW;
81
82         spin_unlock(&imp->imp_lock);
83
84         return 0;
85 }
86 EXPORT_SYMBOL(ptlrpc_init_import);
87
88 #define UUID_STR "_UUID"
89 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
90                       int *uuid_len)
91 {
92         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
93                 ? uuid : uuid + strlen(prefix);
94
95         *uuid_len = strlen(*uuid_start);
96
97         if (*uuid_len < strlen(UUID_STR))
98                 return;
99
100         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
101                     UUID_STR, strlen(UUID_STR)))
102                 *uuid_len -= strlen(UUID_STR);
103 }
104
105 /* Returns true if import was FULL, false if import was already not
106  * connected.
107  * @imp - import to be disconnected
108  * @conn_cnt - connection count (epoch) of the request that timed out
109  *             and caused the disconnection.  In some cases, multiple
110  *             inflight requests can fail to a single target (e.g. OST
111  *             bulk requests) and if one has already caused a reconnection
112  *             (increasing the import->conn_cnt) the older failure should
113  *             not also cause a reconnection.  If zero it forces a reconnect.
114  */
115 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
116 {
117         int rc = 0;
118
119         spin_lock(&imp->imp_lock);
120
121         if (imp->imp_state == LUSTRE_IMP_FULL &&
122             (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
123                 char *target_start;
124                 int   target_len;
125
126                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
127                           &target_start, &target_len);
128                 if (imp->imp_replayable) {
129                         LCONSOLE_WARN("%s: Connection to service %.*s via nid "
130                                "%s was lost; in progress operations using this "
131                                "service will wait for recovery to complete.\n",
132                                imp->imp_obd->obd_name, target_len, target_start,
133                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
134                 } else {
135                         LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
136                                "%.*s via nid %s was lost; in progress "
137                                "operations using this service will fail.\n",
138                                imp->imp_obd->obd_name, target_len, target_start, 
139                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
140                 }
141                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
142                 spin_unlock(&imp->imp_lock);
143     
144                 if (obd_dump_on_timeout)
145                         libcfs_debug_dumplog();
146
147                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
148                 rc = 1;
149         } else {
150                 spin_unlock(&imp->imp_lock);
151                 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
152                        imp->imp_client->cli_name, imp,
153                        (imp->imp_state == LUSTRE_IMP_FULL &&
154                         imp->imp_conn_cnt > conn_cnt) ?
155                        "reconnected" : "not connected", imp->imp_conn_cnt,
156                        conn_cnt, ptlrpc_import_state_name(imp->imp_state));
157         }
158
159         return rc;
160 }
161
162 /* Must be called with imp_lock held! */
163 static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
164 {
165         ENTRY;
166         LASSERT_SPIN_LOCKED(&imp->imp_lock);
167
168         CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
169         imp->imp_invalid = 1;
170         imp->imp_generation++;
171         spin_unlock(&imp->imp_lock);
172
173         ptlrpc_abort_inflight(imp);
174         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
175 }
176
177 /*
178  * This acts as a barrier; all existing requests are rejected, and
179  * no new requests will be accepted until the import is valid again.
180  */
181 void ptlrpc_deactivate_import(struct obd_import *imp)
182 {
183         spin_lock(&imp->imp_lock);
184         ptlrpc_deactivate_and_unlock_import(imp);
185 }
186
187 /*
188  * This function will invalidate the import, if necessary, then block
189  * for all the RPC completions, and finally notify the obd to
190  * invalidate its state (ie cancel locks, clear pending requests,
191  * etc).
192  */
193 void ptlrpc_invalidate_import(struct obd_import *imp)
194 {
195         struct list_head *tmp, *n;
196         struct ptlrpc_request *req;
197         struct l_wait_info lwi;
198         int rc;
199
200         atomic_inc(&imp->imp_inval_count);
201
202         if (!imp->imp_invalid)
203                 ptlrpc_deactivate_import(imp);
204
205         LASSERT(imp->imp_invalid);
206
207         /* wait for all requests to error out and call completion callbacks.
208            Cap it at obd_timeout -- these should all have been locally
209            cancelled by ptlrpc_abort_inflight. */
210         lwi = LWI_TIMEOUT_INTERVAL(
211                 cfs_timeout_cap(cfs_time_seconds(obd_timeout)),
212                 cfs_time_seconds(1), NULL, NULL);
213         rc = l_wait_event(imp->imp_recovery_waitq,
214                           (atomic_read(&imp->imp_inflight) == 0), &lwi);
215
216         if (rc) {
217                 CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
218                        obd2cli_tgt(imp->imp_obd), rc,
219                        atomic_read(&imp->imp_inflight));
220                 spin_lock(&imp->imp_lock);
221                 list_for_each_safe(tmp, n, &imp->imp_sending_list) {
222                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
223                         DEBUG_REQ(D_ERROR, req, "still on sending list");
224                 }
225                 list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
226                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
227                         DEBUG_REQ(D_ERROR, req, "still on delayed list");
228                 }
229                 spin_unlock(&imp->imp_lock);
230         }
231
232         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
233
234         atomic_dec(&imp->imp_inval_count);
235         cfs_waitq_signal(&imp->imp_recovery_waitq);
236 }
237
238 /* unset imp_invalid */
239 void ptlrpc_activate_import(struct obd_import *imp)
240 {
241         struct obd_device *obd = imp->imp_obd;
242
243         spin_lock(&imp->imp_lock);
244         imp->imp_invalid = 0;
245         spin_unlock(&imp->imp_lock);
246
247         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
248 }
249
250 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
251 {
252         ENTRY;
253
254         LASSERT(!imp->imp_dlm_fake);
255
256         if (ptlrpc_set_import_discon(imp, conn_cnt)) {
257                 if (!imp->imp_replayable) {
258                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
259                                "auto-deactivating\n",
260                                obd2cli_tgt(imp->imp_obd),
261                                imp->imp_connection->c_remote_uuid.uuid,
262                                imp->imp_obd->obd_name);
263                         ptlrpc_deactivate_import(imp);
264                 }
265
266                 CDEBUG(D_HA, "%s: waking up pinger\n",
267                        obd2cli_tgt(imp->imp_obd));
268
269                 spin_lock(&imp->imp_lock);
270                 imp->imp_force_verify = 1;
271                 spin_unlock(&imp->imp_lock);
272
273                 ptlrpc_pinger_wake_up();
274         }
275         EXIT;
276 }
277
278 int ptlrpc_reconnect_import(struct obd_import *imp)
279 {
280         
281         ptlrpc_set_import_discon(imp, 0); 
282         /* Force a new connect attempt */
283         ptlrpc_invalidate_import(imp);
284         /* Do a fresh connect next time by zeroing the handle */
285         ptlrpc_disconnect_import(imp, 1);
286         /* Wait for all invalidate calls to finish */
287         if (atomic_read(&imp->imp_inval_count) > 0) {
288                 int rc;
289                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
290                 rc = l_wait_event(imp->imp_recovery_waitq,
291                                   (atomic_read(&imp->imp_inval_count) == 0),
292                                   &lwi);
293                 if (rc)
294                         CERROR("Interrupted, inval=%d\n", 
295                                atomic_read(&imp->imp_inval_count));
296         }
297
298         /* 
299          * Allow reconnect attempts. Note: Currently, the function is
300          * only called by MGC. So assume this is a recoverable import,
301          * and force import to be recoverable. fix this if you need to 
302          */
303         
304         imp->imp_obd->obd_no_recov = 0;
305         /* Remove 'invalid' flag */
306         ptlrpc_activate_import(imp);
307         /* Attempt a new connect */
308         ptlrpc_recover_import(imp, NULL);
309         return 0;
310 }
311
312 EXPORT_SYMBOL(ptlrpc_reconnect_import);
313
314 static int import_select_connection(struct obd_import *imp)
315 {
316         struct obd_import_conn *imp_conn = NULL, *conn;
317         struct obd_export *dlmexp;
318         int tried_all = 1;
319         ENTRY;
320
321         spin_lock(&imp->imp_lock);
322
323         if (list_empty(&imp->imp_conn_list)) {
324                 CERROR("%s: no connections available\n",
325                         imp->imp_obd->obd_name);
326                 spin_unlock(&imp->imp_lock);
327                 RETURN(-EINVAL);
328         }
329
330         list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
331                 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
332                        imp->imp_obd->obd_name,
333                        libcfs_nid2str(conn->oic_conn->c_peer.nid),
334                        conn->oic_last_attempt);
335                 
336                 /* Don't thrash connections */
337                 if (cfs_time_before_64(cfs_time_current_64(),
338                                      conn->oic_last_attempt + 
339                                      cfs_time_seconds(CONNECTION_SWITCH_MIN))) {
340                         continue;
341                 }
342
343                 /* If we have not tried this connection since the
344                    the last successful attempt, go with this one */
345                 if ((conn->oic_last_attempt == 0) ||
346                     cfs_time_beforeq_64(conn->oic_last_attempt,
347                                        imp->imp_last_success_conn)) {
348                         imp_conn = conn;
349                         tried_all = 0;
350                         break;
351                 }
352
353                 /* If all of the connections have already been tried
354                    since the last successful connection; just choose the
355                    least recently used */
356                 if (!imp_conn)
357                         imp_conn = conn;
358                 else if (cfs_time_before_64(conn->oic_last_attempt,
359                                             imp_conn->oic_last_attempt))
360                         imp_conn = conn;
361         }
362
363         /* if not found, simply choose the current one */
364         if (!imp_conn) {
365                 LASSERT(imp->imp_conn_current);
366                 imp_conn = imp->imp_conn_current;
367                 tried_all = 0;
368         }
369         LASSERT(imp_conn->oic_conn);
370
371         /* If we've tried everything, and we're back to the beginning of the
372            list, increase our timeout and try again. It will be reset when
373            we do finally connect. (FIXME: really we should wait for all network
374            state associated with the last connection attempt to drain before
375            trying to reconnect on it.) */
376         if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item) &&
377             !imp->imp_recon_bk /* not retrying */) {
378                 if (at_get(&imp->imp_at.iat_net_latency) <
379                     CONNECTION_SWITCH_MAX) {
380                         at_add(&imp->imp_at.iat_net_latency,
381                                at_get(&imp->imp_at.iat_net_latency) +
382                                CONNECTION_SWITCH_INC);
383                 }
384                 LASSERT(imp_conn->oic_last_attempt);
385                 CWARN("%s: tried all connections, increasing latency to %ds\n",
386                       imp->imp_obd->obd_name,
387                       at_get(&imp->imp_at.iat_net_latency));
388         }
389
390         imp_conn->oic_last_attempt = cfs_time_current_64();
391
392         /* switch connection, don't mind if it's same as the current one */
393         if (imp->imp_connection)
394                 ptlrpc_put_connection(imp->imp_connection);
395         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
396
397         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
398         LASSERT(dlmexp != NULL);
399         if (dlmexp->exp_connection)
400                 ptlrpc_put_connection(dlmexp->exp_connection);
401         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
402         class_export_put(dlmexp);
403
404         if (imp->imp_conn_current != imp_conn) {
405                 if (imp->imp_conn_current)
406                         LCONSOLE_INFO("Changing connection for %s to %s/%s\n",
407                                       imp->imp_obd->obd_name,
408                                       imp_conn->oic_uuid.uuid,
409                                       libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
410                 imp->imp_conn_current = imp_conn;
411         }
412
413         CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
414                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
415                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
416
417         spin_unlock(&imp->imp_lock);
418
419         RETURN(0);
420 }
421
422 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
423 {
424         struct obd_device *obd = imp->imp_obd;
425         int initial_connect = 0;
426         int rc;
427         __u64 committed_before_reconnect = 0;
428         struct ptlrpc_request *request;
429         int size[] = { sizeof(struct ptlrpc_body),
430                        sizeof(imp->imp_obd->u.cli.cl_target_uuid),
431                        sizeof(obd->obd_uuid),
432                        sizeof(imp->imp_dlm_handle),
433                        sizeof(imp->imp_connect_data) };
434         char *tmp[] = { NULL,
435                         obd2cli_tgt(imp->imp_obd),
436                         obd->obd_uuid.uuid,
437                         (char *)&imp->imp_dlm_handle,
438                         (char *)&imp->imp_connect_data };
439         struct ptlrpc_connect_async_args *aa;
440
441         ENTRY;
442         spin_lock(&imp->imp_lock);
443         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
444                 spin_unlock(&imp->imp_lock);
445                 CERROR("can't connect to a closed import\n");
446                 RETURN(-EINVAL);
447         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
448                 spin_unlock(&imp->imp_lock);
449                 CERROR("already connected\n");
450                 RETURN(0);
451         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
452                 spin_unlock(&imp->imp_lock);
453                 CERROR("already connecting\n");
454                 RETURN(-EALREADY);
455         }
456
457         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
458
459         imp->imp_conn_cnt++;
460         imp->imp_resend_replay = 0;
461
462         if (!lustre_handle_is_used(&imp->imp_remote_handle))
463                 initial_connect = 1;
464         else
465                 committed_before_reconnect = imp->imp_peer_committed_transno;
466
467         spin_unlock(&imp->imp_lock);
468
469         if (new_uuid) {
470                 struct obd_uuid uuid;
471
472                 obd_str2uuid(&uuid, new_uuid);
473                 rc = import_set_conn_priority(imp, &uuid);
474                 if (rc)
475                         GOTO(out, rc);
476         }
477
478         rc = import_select_connection(imp);
479         if (rc)
480                 GOTO(out, rc);
481
482         /* last in connection list */
483         if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
484                 if (imp->imp_initial_recov_bk && initial_connect) {
485                         CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
486                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
487                         /* Don't retry if connect fails */
488                         rc = 0;
489                         obd_set_info_async(obd->obd_self_export,
490                                            strlen(KEY_INIT_RECOV),
491                                            KEY_INIT_RECOV,
492                                            sizeof(rc), &rc, NULL);
493                 }
494                 if (imp->imp_recon_bk) {
495                         CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
496                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
497                         spin_lock(&imp->imp_lock);
498                         imp->imp_last_recon = 1;
499                         spin_unlock(&imp->imp_lock);
500                 }
501         }
502
503         /* Reset connect flags to the originally requested flags, in case
504          * the server is updated on-the-fly we will get the new features. */
505         imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
506         rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
507                            &obd->obd_uuid, &imp->imp_connect_data);
508         if (rc)
509                 GOTO(out, rc);
510
511         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
512                                   5, size, tmp);
513         if (!request)
514                 GOTO(out, rc = -ENOMEM);
515
516 #ifndef __KERNEL__
517         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
518 #endif
519         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_NEXT_VER);
520
521         request->rq_send_state = LUSTRE_IMP_CONNECTING;
522         /* Allow a slightly larger reply for future growth compatibility */
523         size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
524                               16 * sizeof(__u64);
525         ptlrpc_req_set_repsize(request, 2, size);
526         request->rq_interpret_reply = ptlrpc_connect_interpret;
527
528         CLASSERT(sizeof (*aa) <= sizeof (request->rq_async_args));
529         aa = (struct ptlrpc_connect_async_args *)&request->rq_async_args;
530         memset(aa, 0, sizeof *aa);
531
532         aa->pcaa_peer_committed = committed_before_reconnect;
533         aa->pcaa_initial_connect = initial_connect;
534         if (aa->pcaa_initial_connect) {
535                 spin_lock(&imp->imp_lock);
536                 imp->imp_replayable = 1;
537                 spin_unlock(&imp->imp_lock);
538                 if (AT_OFF)
539                         /* AT will use INITIAL_CONNECT_TIMEOUT the first
540                            time, adaptive after that. */
541                         request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
542         }
543
544         DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
545                   aa->pcaa_initial_connect ? "initial " : "re", 
546                   imp->imp_conn_cnt);
547         ptlrpcd_add_req(request);
548         rc = 0;
549 out:
550         if (rc != 0) {
551                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
552         }
553
554         RETURN(rc);
555 }
556 EXPORT_SYMBOL(ptlrpc_connect_import);
557
558 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
559 {
560 #ifdef __KERNEL__
561         struct obd_import_conn *imp_conn;
562 #endif
563         int wake_pinger = 0;
564
565         ENTRY;
566
567         spin_lock(&imp->imp_lock);
568         if (list_empty(&imp->imp_conn_list))
569                 GOTO(unlock, 0);
570
571 #ifdef __KERNEL__
572         imp_conn = list_entry(imp->imp_conn_list.prev,
573                               struct obd_import_conn,
574                               oic_item);
575
576         /* XXX: When the failover node is the primary node, it is possible
577          * to have two identical connections in imp_conn_list. We must 
578          * compare not conn's pointers but NIDs, otherwise we can defeat
579          * connection throttling. (See bug 14774.) */
580         if (imp->imp_conn_current->oic_conn->c_self != 
581                                 imp_conn->oic_conn->c_self) {
582                 ptlrpc_ping_import_soon(imp);
583                 wake_pinger = 1;
584         }
585
586 #else
587         /* liblustre has no pinger thead, so we wakup pinger anyway */
588         wake_pinger = 1;
589 #endif 
590  unlock:
591         spin_unlock(&imp->imp_lock);
592
593         if (wake_pinger)
594                 ptlrpc_pinger_wake_up();
595
596         EXIT;
597 }
598
599 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
600                                     void * data, int rc)
601 {
602         struct ptlrpc_connect_async_args *aa = data;
603         struct obd_import *imp = request->rq_import;
604         struct client_obd *cli = &imp->imp_obd->u.cli;
605         struct lustre_handle old_hdl;
606         int msg_flags;
607         ENTRY;
608
609         spin_lock(&imp->imp_lock);
610         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
611                 spin_unlock(&imp->imp_lock);
612                 RETURN(0);
613         }
614         spin_unlock(&imp->imp_lock);
615
616         if (rc)
617                 GOTO(out, rc);
618
619         LASSERT(imp->imp_conn_current);
620
621         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
622
623         /* All imports are pingable */
624         spin_lock(&imp->imp_lock);
625         imp->imp_pingable = 1;
626
627         if (aa->pcaa_initial_connect) {
628                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
629                         imp->imp_replayable = 1;
630                         spin_unlock(&imp->imp_lock);
631                         CDEBUG(D_HA, "connected to replayable target: %s\n",
632                                obd2cli_tgt(imp->imp_obd));
633                 } else {
634                         imp->imp_replayable = 0;
635                         spin_unlock(&imp->imp_lock);
636                 }
637
638                 if (msg_flags & MSG_CONNECT_NEXT_VER) {
639                         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
640                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
641                                obd2cli_tgt(imp->imp_obd));
642                 } else {
643                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
644                                obd2cli_tgt(imp->imp_obd));
645                 }
646
647                 imp->imp_remote_handle =
648                                 *lustre_msg_get_handle(request->rq_repmsg);
649
650                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
651                 ptlrpc_activate_import(imp);
652                 GOTO(finish, rc = 0);
653         } else {
654                 spin_unlock(&imp->imp_lock);
655         }
656
657         /* Determine what recovery state to move the import to. */
658         if (MSG_CONNECT_RECONNECT & msg_flags) {
659                 memset(&old_hdl, 0, sizeof(old_hdl));
660                 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
661                             sizeof (old_hdl))) {
662                         CERROR("%s@%s didn't like our handle "LPX64
663                                ", failed\n", obd2cli_tgt(imp->imp_obd),
664                                imp->imp_connection->c_remote_uuid.uuid,
665                                imp->imp_dlm_handle.cookie);
666                         GOTO(out, rc = -ENOTCONN);
667                 }
668
669                 if (memcmp(&imp->imp_remote_handle,
670                            lustre_msg_get_handle(request->rq_repmsg),
671                            sizeof(imp->imp_remote_handle))) {
672                         int level = D_ERROR;
673                         /* Old MGC can reconnect to a restarted MGS */
674                         if (strcmp(imp->imp_obd->obd_type->typ_name,
675                                    LUSTRE_MGC_NAME) == 0) {
676                                 level = D_CONFIG;
677                         }
678                         CDEBUG(level, 
679                                "%s@%s changed handle from "LPX64" to "LPX64
680                                "; copying, but this may foreshadow disaster\n",
681                                obd2cli_tgt(imp->imp_obd),
682                                imp->imp_connection->c_remote_uuid.uuid,
683                                imp->imp_remote_handle.cookie,
684                                lustre_msg_get_handle(request->rq_repmsg)->
685                                         cookie);
686                         imp->imp_remote_handle =
687                                      *lustre_msg_get_handle(request->rq_repmsg);
688                 } else {
689                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
690                                obd2cli_tgt(imp->imp_obd),
691                                imp->imp_connection->c_remote_uuid.uuid);
692                 }
693
694                 if (imp->imp_invalid) {
695                         CDEBUG(D_HA, "%s: reconnected but import is invalid; "
696                                "marking evicted\n", imp->imp_obd->obd_name);
697                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
698                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
699                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
700                                imp->imp_obd->obd_name,
701                                obd2cli_tgt(imp->imp_obd));
702
703                         spin_lock(&imp->imp_lock);
704                         imp->imp_resend_replay = 1;
705                         spin_unlock(&imp->imp_lock);
706
707                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
708                 } else {
709                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
710                 }
711         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
712                 LASSERT(imp->imp_replayable);
713                 imp->imp_remote_handle =
714                                 *lustre_msg_get_handle(request->rq_repmsg);
715                 imp->imp_last_replay_transno = 0;
716                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
717         } else {
718                 DEBUG_REQ(D_HA, request, "evicting (not initial connect and "
719                           "flags reconnect/recovering not set: %x)",msg_flags);
720                 imp->imp_remote_handle =
721                                 *lustre_msg_get_handle(request->rq_repmsg);
722                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
723         }
724
725         /* Sanity checks for a reconnected import. */
726         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
727                 CERROR("imp_replayable flag does not match server "
728                        "after reconnect. We should LBUG right here.\n");
729         }
730
731         if (lustre_msg_get_last_committed(request->rq_repmsg) <
732             aa->pcaa_peer_committed) {
733                 CERROR("%s went back in time (transno "LPD64
734                        " was previously committed, server now claims "LPD64
735                        ")!  See https://bugzilla.clusterfs.com/"
736                        "long_list.cgi?buglist=9646\n",
737                        obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
738                        lustre_msg_get_last_committed(request->rq_repmsg));
739         }
740
741 finish:
742         rc = ptlrpc_import_recovery_state_machine(imp);
743         if (rc != 0) {
744                 if (rc == -ENOTCONN) {
745                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
746                                "invalidating and reconnecting\n",
747                                obd2cli_tgt(imp->imp_obd),
748                                imp->imp_connection->c_remote_uuid.uuid);
749                         ptlrpc_connect_import(imp, NULL);
750                         RETURN(0);
751                 }
752         } else {
753                 struct obd_connect_data *ocd;
754                 struct obd_export *exp;
755
756                 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
757                                          lustre_swab_connect);
758                 spin_lock(&imp->imp_lock);
759                 list_del(&imp->imp_conn_current->oic_item);
760                 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
761                 imp->imp_last_success_conn =
762                         imp->imp_conn_current->oic_last_attempt;
763
764                 if (ocd == NULL) {
765                         spin_unlock(&imp->imp_lock);
766                         CERROR("Wrong connect data from server\n");
767                         rc = -EPROTO;
768                         GOTO(out, rc);
769                 }
770
771                 imp->imp_connect_data = *ocd;
772
773                 exp = class_conn2export(&imp->imp_dlm_handle);
774                 spin_unlock(&imp->imp_lock);
775
776                 /* check that server granted subset of flags we asked for. */
777                 LASSERTF((ocd->ocd_connect_flags &
778                           imp->imp_connect_flags_orig) ==
779                          ocd->ocd_connect_flags, LPX64" != "LPX64,
780                          imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
781
782                 if (!exp) {
783                         /* This could happen if export is cleaned during the 
784                            connect attempt */
785                         CERROR("Missing export for %s\n", 
786                                imp->imp_obd->obd_name);
787                         GOTO(out, rc = -ENODEV);
788                 }
789                 exp->exp_connect_flags = ocd->ocd_connect_flags;
790                 imp->imp_obd->obd_self_export->exp_connect_flags = ocd->ocd_connect_flags;
791                 class_export_put(exp);
792
793                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
794
795                 if (!ocd->ocd_ibits_known &&
796                     ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
797                         CERROR("Inodebits aware server returned zero compatible"
798                                " bits?\n");
799
800                 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
801                     (ocd->ocd_version > LUSTRE_VERSION_CODE +
802                                         LUSTRE_VERSION_OFFSET_WARN ||
803                      ocd->ocd_version < LUSTRE_VERSION_CODE -
804                                         LUSTRE_VERSION_OFFSET_WARN)) {
805                         /* Sigh, some compilers do not like #ifdef in the middle
806                            of macro arguments */
807 #ifdef __KERNEL__
808                         const char *older =
809                                 "older.  Consider upgrading this client";
810 #else
811                         const char *older =
812                                 "older.  Consider recompiling this application";
813 #endif
814                         const char *newer = "newer than client version";
815
816                         LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
817                                       "is much %s (%s)\n",
818                                       obd2cli_tgt(imp->imp_obd),
819                                       OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
820                                       OBD_OCD_VERSION_MINOR(ocd->ocd_version),
821                                       OBD_OCD_VERSION_PATCH(ocd->ocd_version),
822                                       OBD_OCD_VERSION_FIX(ocd->ocd_version),
823                                       ocd->ocd_version > LUSTRE_VERSION_CODE ?
824                                       newer : older, LUSTRE_VERSION_STRING);
825                 }
826
827                 if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
828                         /* We sent to the server ocd_cksum_types with bits set
829                          * for algorithms we understand. The server masked off
830                          * the checksum types it doesn't support */
831                         if ((ocd->ocd_cksum_types & OBD_CKSUM_ALL) == 0) {
832                                 LCONSOLE_WARN("The negotiation of the checksum "
833                                               "alogrithm to use with server %s "
834                                               "failed (%x/%x), disabling "
835                                               "checksums\n",
836                                               obd2cli_tgt(imp->imp_obd),
837                                               ocd->ocd_cksum_types,
838                                               OBD_CKSUM_ALL);
839                                 cli->cl_checksum = 0;
840                                 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
841                                 cli->cl_cksum_type = OBD_CKSUM_CRC32;
842                         } else {
843                                 cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
844
845                                 if (ocd->ocd_cksum_types & OSC_DEFAULT_CKSUM)
846                                         cli->cl_cksum_type = OSC_DEFAULT_CKSUM;
847                                 else if (ocd->ocd_cksum_types & OBD_CKSUM_ADLER)
848                                         cli->cl_cksum_type = OBD_CKSUM_ADLER;
849                                 else
850                                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
851                         }
852                 } else {
853                         /* The server does not support OBD_CONNECT_CKSUM.
854                          * Enforce CRC32 for backward compatibility*/
855                         cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
856                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
857                 }
858
859                 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
860                         cli->cl_max_pages_per_rpc = 
861                                 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
862                 }
863
864                 imp->imp_obd->obd_namespace->ns_connect_flags = 
865                         ocd->ocd_connect_flags;
866                 imp->imp_obd->obd_namespace->ns_orig_connect_flags = 
867                         ocd->ocd_connect_flags;
868
869                 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
870                     (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
871                         /* We need a per-message support flag, because 
872                            a. we don't know if the incoming connect reply
873                               supports AT or not (in reply_in_callback)
874                               until we unpack it.
875                            b. failovered server means export and flags are gone
876                               (in ptlrpc_send_reply).
877                            Can only be set when we know AT is supported at 
878                            both ends */
879                         imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
880                 else
881                         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
882
883                 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
884                         (cli->cl_max_pages_per_rpc > 0));
885         }
886
887  out:
888         if (rc != 0) {
889                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
890                 spin_lock(&imp->imp_lock);
891                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov &&
892                     (request->rq_import_generation == imp->imp_generation))
893                         ptlrpc_deactivate_and_unlock_import(imp);
894                 else
895                         spin_unlock(&imp->imp_lock);
896
897                 if (imp->imp_recon_bk && imp->imp_last_recon) {
898                         /* Give up trying to reconnect */
899                         imp->imp_obd->obd_no_recov = 1;
900                         ptlrpc_deactivate_import(imp);
901                 }
902
903                 if (rc == -EPROTO) {
904                         struct obd_connect_data *ocd;
905                         ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
906                                                  sizeof *ocd,
907                                                  lustre_swab_connect);
908                         if (ocd &&
909                             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
910                             (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
911                            /* Actually servers are only supposed to refuse
912                               connection from liblustre clients, so we should
913                               never see this from VFS context */
914                                 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
915                                         "(%d.%d.%d.%d)"
916                                         " refused connection from this client "
917                                         "with an incompatible version (%s).  "
918                                         "Client must be recompiled\n",
919                                         obd2cli_tgt(imp->imp_obd),
920                                         OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
921                                         OBD_OCD_VERSION_MINOR(ocd->ocd_version),
922                                         OBD_OCD_VERSION_PATCH(ocd->ocd_version),
923                                         OBD_OCD_VERSION_FIX(ocd->ocd_version),
924                                         LUSTRE_VERSION_STRING);
925                                 ptlrpc_deactivate_import(imp);
926                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
927                         }
928                         RETURN(-EPROTO);
929                 }
930
931                 ptlrpc_maybe_ping_import_soon(imp);
932
933                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
934                        obd2cli_tgt(imp->imp_obd),
935                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
936         }
937         
938         spin_lock(&imp->imp_lock);
939         imp->imp_last_recon = 0;
940         spin_unlock(&imp->imp_lock);
941
942         cfs_waitq_signal(&imp->imp_recovery_waitq);
943         RETURN(rc);
944 }
945
946 static int completed_replay_interpret(struct ptlrpc_request *req,
947                                     void * data, int rc)
948 {
949         ENTRY;
950         atomic_dec(&req->rq_import->imp_replay_inflight);
951         if (req->rq_status == 0) {
952                 ptlrpc_import_recovery_state_machine(req->rq_import);
953         } else {
954                 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
955                        "reconnecting\n",
956                        req->rq_import->imp_obd->obd_name, req->rq_status);
957                 ptlrpc_connect_import(req->rq_import, NULL);
958         }
959
960         RETURN(0);
961 }
962
963 static int signal_completed_replay(struct obd_import *imp)
964 {
965         struct ptlrpc_request *req;
966         ENTRY;
967
968         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
969         atomic_inc(&imp->imp_replay_inflight);
970
971         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
972         if (!req) {
973                 atomic_dec(&imp->imp_replay_inflight);
974                 RETURN(-ENOMEM);
975         }
976
977         ptlrpc_req_set_repsize(req, 1, NULL);
978         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
979         lustre_msg_add_flags(req->rq_reqmsg, MSG_LAST_REPLAY);
980         req->rq_timeout *= 3;
981         req->rq_interpret_reply = completed_replay_interpret;
982
983         ptlrpcd_add_req(req);
984         RETURN(0);
985 }
986
987 #ifdef __KERNEL__
988 static int ptlrpc_invalidate_import_thread(void *data)
989 {
990         struct obd_import *imp = data;
991
992         ENTRY;
993
994         ptlrpc_daemonize("ll_imp_inval");
995         
996         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
997                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
998                imp->imp_connection->c_remote_uuid.uuid);
999
1000         ptlrpc_invalidate_import(imp);
1001
1002         if (obd_dump_on_eviction) {
1003                 CERROR("dump the log upon eviction\n");
1004                 libcfs_debug_dumplog();
1005         }
1006
1007         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1008         ptlrpc_import_recovery_state_machine(imp);
1009
1010         RETURN(0);
1011 }
1012 #endif
1013
1014 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
1015 {
1016         int rc = 0;
1017         int inflight;
1018         char *target_start;
1019         int target_len;
1020
1021         ENTRY;
1022         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
1023                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1024                           &target_start, &target_len);
1025                 /* Don't care about MGC eviction */
1026                 if (strcmp(imp->imp_obd->obd_type->typ_name,
1027                            LUSTRE_MGC_NAME) != 0) {
1028                         LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
1029                                            "%.*s; in progress operations using "
1030                                            "this service will fail.\n",
1031                                            target_len, target_start);
1032                 }
1033                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
1034                        obd2cli_tgt(imp->imp_obd),
1035                        imp->imp_connection->c_remote_uuid.uuid);
1036
1037 #ifdef __KERNEL__
1038                 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
1039                                    CLONE_VM | CLONE_FILES);
1040                 if (rc < 0)
1041                         CERROR("error starting invalidate thread: %d\n", rc);
1042                 else
1043                         rc = 0;
1044                 RETURN(rc);
1045 #else
1046                 ptlrpc_invalidate_import(imp);
1047
1048                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1049 #endif
1050         }
1051
1052         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1053                 CDEBUG(D_HA, "replay requested by %s\n",
1054                        obd2cli_tgt(imp->imp_obd));
1055                 rc = ptlrpc_replay_next(imp, &inflight);
1056                 if (inflight == 0 &&
1057                     atomic_read(&imp->imp_replay_inflight) == 0) {
1058                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
1059                         rc = ldlm_replay_locks(imp);
1060                         if (rc)
1061                                 GOTO(out, rc);
1062                 }
1063                 rc = 0;
1064         }
1065
1066         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1067                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1068                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
1069                         rc = signal_completed_replay(imp);
1070                         if (rc)
1071                                 GOTO(out, rc);
1072                 }
1073
1074         }
1075
1076         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1077                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1078                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1079                 }
1080         }
1081
1082         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1083                 CDEBUG(D_HA, "reconnected to %s@%s\n",
1084                        obd2cli_tgt(imp->imp_obd),
1085                        imp->imp_connection->c_remote_uuid.uuid);
1086
1087                 rc = ptlrpc_resend(imp);
1088                 if (rc)
1089                         GOTO(out, rc);
1090                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1091                 ptlrpc_activate_import(imp);
1092
1093                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1094                           &target_start, &target_len);
1095                 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1096                               "using nid %s.\n", imp->imp_obd->obd_name,
1097                               target_len, target_start,
1098                               libcfs_nid2str(imp->imp_connection->c_peer.nid));
1099         }
1100
1101         if (imp->imp_state == LUSTRE_IMP_FULL) {
1102                 cfs_waitq_signal(&imp->imp_recovery_waitq);
1103                 ptlrpc_wake_delayed(imp);
1104         }
1105
1106  out:
1107         RETURN(rc);
1108 }
1109
1110 static int back_to_sleep(void *unused)
1111 {
1112         return 0;
1113 }
1114
1115 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1116 {
1117         struct ptlrpc_request *req;
1118         int rq_opc, rc = 0;
1119         int nowait = imp->imp_obd->obd_force;
1120         ENTRY;
1121
1122         if (nowait)
1123                 GOTO(set_state, rc);
1124
1125         switch (imp->imp_connect_op) {
1126         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1127         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1128         case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1129         default:
1130                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1131                        obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1132                 RETURN(-EINVAL);
1133         }
1134
1135         if (ptlrpc_import_in_recovery(imp)) {
1136                 struct l_wait_info lwi;
1137                 cfs_duration_t timeout;
1138
1139                 if (AT_OFF) {
1140                         timeout = cfs_time_seconds(obd_timeout);
1141                 } else {
1142                         int idx = import_at_get_index(imp, 
1143                                 imp->imp_client->cli_request_portal);
1144                         timeout = cfs_time_seconds(
1145                                 at_get(&imp->imp_at.iat_service_estimate[idx]));
1146                 }
1147                 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout), 
1148                                        back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1149                 rc = l_wait_event(imp->imp_recovery_waitq,
1150                                   !ptlrpc_import_in_recovery(imp), &lwi);
1151         }
1152
1153         spin_lock(&imp->imp_lock);
1154         if (imp->imp_state != LUSTRE_IMP_FULL)
1155                 GOTO(out, 0);
1156
1157         spin_unlock(&imp->imp_lock);
1158
1159         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1160         if (req) {
1161                 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1162                  * it fails.  We can get through the above with a down server
1163                  * if the client doesn't know the server is gone yet. */
1164                 req->rq_no_resend = 1;
1165                 
1166 #ifndef CRAY_XT3
1167                 /* We want client umounts to happen quickly, no matter the 
1168                    server state... */
1169                 req->rq_timeout = min_t(int, req->rq_timeout,
1170                                         INITIAL_CONNECT_TIMEOUT);
1171 #else
1172                 /* ... but we always want liblustre clients to nicely 
1173                    disconnect, so only use the adaptive value. */
1174                 if (AT_OFF)
1175                         req->rq_timeout = obd_timeout / 3;
1176 #endif
1177
1178                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1179                 req->rq_send_state =  LUSTRE_IMP_CONNECTING;
1180                 ptlrpc_req_set_repsize(req, 1, NULL);
1181                 rc = ptlrpc_queue_wait(req);
1182                 ptlrpc_req_finished(req);
1183         }
1184
1185 set_state:
1186         spin_lock(&imp->imp_lock);
1187 out:
1188         if (noclose) 
1189                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1190         else
1191                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1192         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1193         /* Try all connections in the future - bz 12758 */ 
1194         imp->imp_last_recon = 0;
1195         spin_unlock(&imp->imp_lock);
1196
1197         RETURN(rc);
1198 }
1199
1200 /* Sets maximal number of RPCs possible originating from other side of this
1201    import (server) to us and number of async RPC replies that we are not waiting
1202    for arriving */
1203 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1204 {
1205         LNetSetAsync(imp->imp_connection->c_peer, count);
1206 }
1207
1208
1209 /* Adaptive Timeout utils */
1210 extern unsigned int at_min, at_max, at_history;
1211
1212 /* Bin into timeslices using AT_BINS bins.
1213    This gives us a max of the last binlimit*AT_BINS secs without the storage,
1214    but still smoothing out a return to normalcy from a slow response.
1215    (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1216 int at_add(struct adaptive_timeout *at, unsigned int val) 
1217 {
1218         unsigned int old = at->at_current;
1219         time_t now = cfs_time_current_sec();
1220         time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
1221
1222         LASSERT(at);
1223 #if 0
1224         CDEBUG(D_INFO, "add %u to %p time=%lu v=%u (%u %u %u %u)\n", 
1225                val, at, now - at->at_binstart, at->at_current,
1226                at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1227 #endif
1228         if (val == 0) 
1229                 /* 0's don't count, because we never want our timeout to 
1230                    drop to 0, and because 0 could mean an error */
1231                 return 0;
1232
1233         spin_lock(&at->at_lock);
1234
1235         if (unlikely(at->at_binstart == 0)) {
1236                 /* Special case to remove default from history */
1237                 at->at_current = val;
1238                 at->at_worst_ever = val;
1239                 at->at_worst_time = now;
1240                 at->at_hist[0] = val;
1241                 at->at_binstart = now;
1242         } else if (now - at->at_binstart < binlimit ) {
1243                 /* in bin 0 */
1244                 at->at_hist[0] = max(val, at->at_hist[0]);
1245                 at->at_current = max(val, at->at_current);
1246         } else {
1247                 int i, shift;
1248                 unsigned int maxv = val;
1249                 /* move bins over */
1250                 shift = (now - at->at_binstart) / binlimit;
1251                 LASSERT(shift > 0);
1252                 for(i = AT_BINS - 1; i >= 0; i--) {
1253                         if (i >= shift) {
1254                                 at->at_hist[i] = at->at_hist[i - shift];
1255                                 maxv = max(maxv, at->at_hist[i]);
1256                         } else {
1257                                 at->at_hist[i] = 0;
1258                         }
1259                 }
1260                 at->at_hist[0] = val;
1261                 at->at_current = maxv;
1262                 at->at_binstart += shift * binlimit;
1263         }
1264
1265         if (at->at_current > at->at_worst_ever) {
1266                 at->at_worst_ever = at->at_current;
1267                 at->at_worst_time = now;
1268         }
1269
1270         if (at->at_flags & AT_FLG_NOHIST)
1271                 /* Only keep last reported val; keeping the rest of the history
1272                    for proc only */
1273                 at->at_current = val;
1274
1275         if (at_max > 0)
1276                 at->at_current =  min(at->at_current, at_max);
1277         at->at_current =  max(at->at_current, at_min);
1278
1279 #if 0
1280         if (at->at_current != old)
1281                 CDEBUG(D_ADAPTTO, "AT %p change: old=%u new=%u delta=%d "
1282                        "(val=%u) hist %u %u %u %u\n", at,
1283                        old, at->at_current, at->at_current - old, val,
1284                        at->at_hist[0], at->at_hist[1], at->at_hist[2],
1285                        at->at_hist[3]);
1286 #endif
1287         
1288         /* if we changed, report the old value */
1289         old = (at->at_current != old) ? old : 0;
1290         
1291         spin_unlock(&at->at_lock);
1292         return old;
1293 }
1294
1295 /* Find the imp_at index for a given portal; assign if space available */
1296 int import_at_get_index(struct obd_import *imp, int portal) 
1297 {
1298         struct imp_at *at = &imp->imp_at;
1299         int i;
1300
1301         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1302                 if (at->iat_portal[i] == portal) 
1303                         return i;
1304                 if (at->iat_portal[i] == 0)
1305                         /* unused */
1306                         break;
1307         }
1308
1309         /* Not found in list, add it under a lock */
1310         spin_lock(&imp->imp_lock);
1311
1312         /* Check unused under lock */
1313         for (; i < IMP_AT_MAX_PORTALS; i++) {
1314                 if (at->iat_portal[i] == portal) 
1315                         goto out;
1316                 if (at->iat_portal[i] == 0)
1317                         /* unused */
1318                         break;
1319         }
1320         
1321         /* Not enough portals? */
1322         LASSERT(i < IMP_AT_MAX_PORTALS);
1323
1324         at->iat_portal[i] = portal;
1325 out:
1326         spin_unlock(&imp->imp_lock);
1327         return i;
1328 }
1329