Whamcloud - gitweb
b575270f6a467b1c17da41c2946f06f089fdea2f
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/import.c
37  *
38  * Author: Mike Shaver <shaver@clusterfs.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #ifndef __KERNEL__
43 # include <liblustre.h>
44 #endif
45
46 #include <obd_support.h>
47 #include <lustre_ha.h>
48 #include <lustre_net.h>
49 #include <lustre_import.h>
50 #include <lustre_export.h>
51 #include <obd.h>
52 #include <obd_cksum.h>
53 #include <obd_class.h>
54
55 #include "ptlrpc_internal.h"
56
57 struct ptlrpc_connect_async_args {
58          __u64 pcaa_peer_committed;
59         int pcaa_initial_connect;
60 };
61
62 /* A CLOSED import should remain so. */
63 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
64 do {                                                                           \
65         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
66                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
67                       imp, obd2cli_tgt(imp->imp_obd),                          \
68                       ptlrpc_import_state_name(imp->imp_state),                \
69                       ptlrpc_import_state_name(state));                        \
70                imp->imp_state = state;                                         \
71         }                                                                      \
72 } while(0)
73
74 #define IMPORT_SET_STATE(imp, state)            \
75 do {                                            \
76         spin_lock(&imp->imp_lock);              \
77         IMPORT_SET_STATE_NOLOCK(imp, state);    \
78         spin_unlock(&imp->imp_lock);            \
79 } while(0)
80
81
82 static int ptlrpc_connect_interpret(const struct lu_env *env,
83                                     struct ptlrpc_request *request,
84                                     void * data, int rc);
85 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
86
87 /* Only this function is allowed to change the import state when it is
88  * CLOSED. I would rather refcount the import and free it after
89  * disconnection like we do with exports. To do that, the client_obd
90  * will need to save the peer info somewhere other than in the import,
91  * though. */
92 int ptlrpc_init_import(struct obd_import *imp)
93 {
94         spin_lock(&imp->imp_lock);
95
96         imp->imp_generation++;
97         imp->imp_state =  LUSTRE_IMP_NEW;
98
99         spin_unlock(&imp->imp_lock);
100
101         return 0;
102 }
103 EXPORT_SYMBOL(ptlrpc_init_import);
104
105 #define UUID_STR "_UUID"
106 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
107                       int *uuid_len)
108 {
109         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
110                 ? uuid : uuid + strlen(prefix);
111
112         *uuid_len = strlen(*uuid_start);
113
114         if (*uuid_len < strlen(UUID_STR))
115                 return;
116
117         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
118                     UUID_STR, strlen(UUID_STR)))
119                 *uuid_len -= strlen(UUID_STR);
120 }
121
122 /* Returns true if import was FULL, false if import was already not
123  * connected.
124  * @imp - import to be disconnected
125  * @conn_cnt - connection count (epoch) of the request that timed out
126  *             and caused the disconnection.  In some cases, multiple
127  *             inflight requests can fail to a single target (e.g. OST
128  *             bulk requests) and if one has already caused a reconnection
129  *             (increasing the import->conn_cnt) the older failure should
130  *             not also cause a reconnection.  If zero it forces a reconnect.
131  */
132 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
133 {
134         int rc = 0;
135
136         spin_lock(&imp->imp_lock);
137
138         if (imp->imp_state == LUSTRE_IMP_FULL &&
139             (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
140                 char *target_start;
141                 int   target_len;
142
143                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
144                           &target_start, &target_len);
145
146                 if (imp->imp_replayable) {
147                         LCONSOLE_WARN("%s: Connection to service %.*s via nid "
148                                "%s was lost; in progress operations using this "
149                                "service will wait for recovery to complete.\n",
150                                imp->imp_obd->obd_name, target_len, target_start,
151                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
152                 } else {
153                         LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
154                                            "%.*s via nid %s was lost; in progress"
155                                            "operations using this service will"
156                                            "fail.\n",
157                                            imp->imp_obd->obd_name,
158                                            target_len, target_start,
159                                  libcfs_nid2str(imp->imp_connection->c_peer.nid));
160                 }
161                 ptlrpc_deactivate_timeouts(imp);
162                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
163                 spin_unlock(&imp->imp_lock);
164
165                 if (obd_dump_on_timeout)
166                         libcfs_debug_dumplog();
167
168                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
169                 rc = 1;
170         } else {
171                 spin_unlock(&imp->imp_lock);
172                 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
173                        imp->imp_client->cli_name, imp,
174                        (imp->imp_state == LUSTRE_IMP_FULL &&
175                         imp->imp_conn_cnt > conn_cnt) ?
176                        "reconnected" : "not connected", imp->imp_conn_cnt,
177                        conn_cnt, ptlrpc_import_state_name(imp->imp_state));
178         }
179
180         return rc;
181 }
182
183 /* Must be called with imp_lock held! */
184 static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
185 {
186         ENTRY;
187         LASSERT_SPIN_LOCKED(&imp->imp_lock);
188
189         CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
190         imp->imp_invalid = 1;
191         imp->imp_generation++;
192         spin_unlock(&imp->imp_lock);
193
194         ptlrpc_abort_inflight(imp);
195         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
196
197         EXIT;
198 }
199
200 /*
201  * This acts as a barrier; all existing requests are rejected, and
202  * no new requests will be accepted until the import is valid again.
203  */
204 void ptlrpc_deactivate_import(struct obd_import *imp)
205 {
206         spin_lock(&imp->imp_lock);
207         ptlrpc_deactivate_and_unlock_import(imp);
208 }
209
210 /*
211  * This function will invalidate the import, if necessary, then block
212  * for all the RPC completions, and finally notify the obd to
213  * invalidate its state (ie cancel locks, clear pending requests,
214  * etc).
215  */
216 void ptlrpc_invalidate_import(struct obd_import *imp)
217 {
218         struct list_head *tmp, *n;
219         struct ptlrpc_request *req;
220         struct l_wait_info lwi;
221         int rc;
222
223         atomic_inc(&imp->imp_inval_count);
224
225         /*
226          * If this is an invalid MGC connection, then don't bother
227          * waiting for imp_inflight to drop to 0.
228          */
229         if (imp->imp_invalid && imp->imp_recon_bk && !imp->imp_obd->obd_no_recov)
230                 goto out;
231
232         if (!imp->imp_invalid || imp->imp_obd->obd_no_recov)
233                 ptlrpc_deactivate_import(imp);
234
235         LASSERT(imp->imp_invalid);
236
237         /* wait for all requests to error out and call completion callbacks.
238            Cap it at obd_timeout -- these should all have been locally
239            cancelled by ptlrpc_abort_inflight. */
240         lwi = LWI_TIMEOUT_INTERVAL(
241                 cfs_timeout_cap(cfs_time_seconds(obd_timeout)),
242                 cfs_time_seconds(1), NULL, NULL);
243         rc = l_wait_event(imp->imp_recovery_waitq,
244                           (atomic_read(&imp->imp_inflight) == 0), &lwi);
245
246         if (rc) {
247                 CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
248                          obd2cli_tgt(imp->imp_obd), rc,
249                          atomic_read(&imp->imp_inflight));
250                 spin_lock(&imp->imp_lock);
251                 list_for_each_safe(tmp, n, &imp->imp_sending_list) {
252                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
253                         DEBUG_REQ(D_ERROR, req, "still on sending list");
254                 }
255                 list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
256                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
257                         DEBUG_REQ(D_ERROR, req, "still on delayed list");
258                 }
259                 spin_unlock(&imp->imp_lock);
260                 LASSERT(atomic_read(&imp->imp_inflight) == 0);
261         }
262
263 out:
264         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
265         sptlrpc_import_flush_all_ctx(imp);
266
267         atomic_dec(&imp->imp_inval_count);
268         cfs_waitq_signal(&imp->imp_recovery_waitq);
269 }
270
271 /* unset imp_invalid */
272 void ptlrpc_activate_import(struct obd_import *imp)
273 {
274         struct obd_device *obd = imp->imp_obd;
275
276         spin_lock(&imp->imp_lock);
277         imp->imp_invalid = 0;
278         ptlrpc_activate_timeouts(imp);
279         spin_unlock(&imp->imp_lock);
280         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
281 }
282
283 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
284 {
285         ENTRY;
286
287         LASSERT(!imp->imp_dlm_fake);
288
289         if (ptlrpc_set_import_discon(imp, conn_cnt)) {
290                 if (!imp->imp_replayable) {
291                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
292                                "auto-deactivating\n",
293                                obd2cli_tgt(imp->imp_obd),
294                                imp->imp_connection->c_remote_uuid.uuid,
295                                imp->imp_obd->obd_name);
296                         ptlrpc_deactivate_import(imp);
297                 }
298
299                 CDEBUG(D_HA, "%s: waking up pinger\n",
300                        obd2cli_tgt(imp->imp_obd));
301
302                 spin_lock(&imp->imp_lock);
303                 imp->imp_force_verify = 1;
304                 spin_unlock(&imp->imp_lock);
305
306                 ptlrpc_pinger_wake_up();
307         }
308         EXIT;
309 }
310
311 int ptlrpc_reconnect_import(struct obd_import *imp)
312 {
313         ptlrpc_set_import_discon(imp, 0);
314         /* Force a new connect attempt */
315         ptlrpc_invalidate_import(imp);
316         /* Do a fresh connect next time by zeroing the handle */
317         ptlrpc_disconnect_import(imp, 1);
318         /* Wait for all invalidate calls to finish */
319         if (atomic_read(&imp->imp_inval_count) > 0) {
320                 int rc;
321                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
322                 rc = l_wait_event(imp->imp_recovery_waitq,
323                                   (atomic_read(&imp->imp_inval_count) == 0),
324                                   &lwi);
325                 if (rc)
326                         CERROR("Interrupted, inval=%d\n",
327                                atomic_read(&imp->imp_inval_count));
328         }
329
330         /* Allow reconnect attempts */
331         imp->imp_obd->obd_no_recov = 0;
332         /* Remove 'invalid' flag */
333         ptlrpc_activate_import(imp);
334         /* Attempt a new connect */
335         ptlrpc_recover_import(imp, NULL);
336         return 0;
337 }
338
339 EXPORT_SYMBOL(ptlrpc_reconnect_import);
340
341 static int import_select_connection(struct obd_import *imp)
342 {
343         struct obd_import_conn *imp_conn = NULL, *conn;
344         struct obd_export *dlmexp;
345         int tried_all = 1;
346         ENTRY;
347
348         spin_lock(&imp->imp_lock);
349
350         if (list_empty(&imp->imp_conn_list)) {
351                 CERROR("%s: no connections available\n",
352                         imp->imp_obd->obd_name);
353                 spin_unlock(&imp->imp_lock);
354                 RETURN(-EINVAL);
355         }
356
357         list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
358                 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
359                        imp->imp_obd->obd_name,
360                        libcfs_nid2str(conn->oic_conn->c_peer.nid),
361                        conn->oic_last_attempt);
362                 /* Don't thrash connections */
363                 if (cfs_time_before_64(cfs_time_current_64(),
364                                      conn->oic_last_attempt +
365                                      cfs_time_seconds(CONNECTION_SWITCH_MIN))) {
366                         continue;
367                 }
368
369                 /* If we have not tried this connection since the
370                    the last successful attempt, go with this one */
371                 if ((conn->oic_last_attempt == 0) ||
372                     cfs_time_beforeq_64(conn->oic_last_attempt,
373                                        imp->imp_last_success_conn)) {
374                         imp_conn = conn;
375                         tried_all = 0;
376                         break;
377                 }
378
379                 /* If all of the connections have already been tried
380                    since the last successful connection; just choose the
381                    least recently used */
382                 if (!imp_conn)
383                         imp_conn = conn;
384                 else if (cfs_time_before_64(conn->oic_last_attempt,
385                                             imp_conn->oic_last_attempt))
386                         imp_conn = conn;
387         }
388
389         /* if not found, simply choose the current one */
390         if (!imp_conn) {
391                 LASSERT(imp->imp_conn_current);
392                 imp_conn = imp->imp_conn_current;
393                 tried_all = 0;
394         }
395         LASSERT(imp_conn->oic_conn);
396
397         /* If we've tried everything, and we're back to the beginning of the
398            list, increase our timeout and try again. It will be reset when
399            we do finally connect. (FIXME: really we should wait for all network
400            state associated with the last connection attempt to drain before
401            trying to reconnect on it.) */
402         if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item) &&
403             !imp->imp_recon_bk /* not retrying */) {
404                 if (at_get(&imp->imp_at.iat_net_latency) <
405                     CONNECTION_SWITCH_MAX) {
406                         at_add(&imp->imp_at.iat_net_latency,
407                                at_get(&imp->imp_at.iat_net_latency) +
408                                CONNECTION_SWITCH_INC);
409                 }
410                 LASSERT(imp_conn->oic_last_attempt);
411                 CWARN("%s: tried all connections, increasing latency to %ds\n",
412                       imp->imp_obd->obd_name,
413                       at_get(&imp->imp_at.iat_net_latency));
414         }
415
416         imp_conn->oic_last_attempt = cfs_time_current_64();
417
418         /* switch connection, don't mind if it's same as the current one */
419         if (imp->imp_connection)
420                 ptlrpc_connection_put(imp->imp_connection);
421         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
422
423         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
424         LASSERT(dlmexp != NULL);
425         if (dlmexp->exp_connection)
426                 ptlrpc_connection_put(dlmexp->exp_connection);
427         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
428         class_export_put(dlmexp);
429
430         if (imp->imp_conn_current != imp_conn) {
431                 if (imp->imp_conn_current)
432                         LCONSOLE_INFO("Changing connection for %s to %s/%s\n",
433                                       imp->imp_obd->obd_name,
434                                       imp_conn->oic_uuid.uuid,
435                                       libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
436                 imp->imp_conn_current = imp_conn;
437         }
438
439         CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
440                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
441                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
442
443         spin_unlock(&imp->imp_lock);
444
445         RETURN(0);
446 }
447
448 /*
449  * must be called under imp_lock
450  */
451 int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
452 {
453         struct ptlrpc_request *req;
454         struct list_head *tmp;
455
456         if (list_empty(&imp->imp_replay_list))
457                 return 0;
458         tmp = imp->imp_replay_list.next;
459         req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
460         *transno = req->rq_transno;
461         if (req->rq_transno == 0) {
462                 DEBUG_REQ(D_ERROR, req, "zero transno in replay");
463                 LBUG();
464         }
465
466         return 1;
467 }
468
469 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
470 {
471         struct obd_device *obd = imp->imp_obd;
472         int initial_connect = 0;
473         int set_transno = 0;
474         __u64 committed_before_reconnect = 0;
475         struct ptlrpc_request *request;
476         char *bufs[] = { NULL,
477                          obd2cli_tgt(imp->imp_obd),
478                          obd->obd_uuid.uuid,
479                          (char *)&imp->imp_dlm_handle,
480                          (char *)&imp->imp_connect_data };
481         struct ptlrpc_connect_async_args *aa;
482         int rc;
483         ENTRY;
484
485         spin_lock(&imp->imp_lock);
486         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
487                 spin_unlock(&imp->imp_lock);
488                 CERROR("can't connect to a closed import\n");
489                 RETURN(-EINVAL);
490         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
491                 spin_unlock(&imp->imp_lock);
492                 CERROR("already connected\n");
493                 RETURN(0);
494         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
495                 spin_unlock(&imp->imp_lock);
496                 CERROR("already connecting\n");
497                 RETURN(-EALREADY);
498         }
499
500         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
501
502         imp->imp_conn_cnt++;
503         imp->imp_resend_replay = 0;
504
505         if (!lustre_handle_is_used(&imp->imp_remote_handle))
506                 initial_connect = 1;
507         else
508                 committed_before_reconnect = imp->imp_peer_committed_transno;
509
510         set_transno = ptlrpc_first_transno(imp, &imp->imp_connect_data.ocd_transno);
511         spin_unlock(&imp->imp_lock);
512
513         if (new_uuid) {
514                 struct obd_uuid uuid;
515
516                 obd_str2uuid(&uuid, new_uuid);
517                 rc = import_set_conn_priority(imp, &uuid);
518                 if (rc)
519                         GOTO(out, rc);
520         }
521
522         rc = import_select_connection(imp);
523         if (rc)
524                 GOTO(out, rc);
525
526         /* last in connection list */
527         if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
528                 if (imp->imp_initial_recov_bk && initial_connect) {
529                         CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
530                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
531                         /* Don't retry if connect fails */
532                         rc = 0;
533                         obd_set_info_async(obd->obd_self_export,
534                                            sizeof(KEY_INIT_RECOV),
535                                            KEY_INIT_RECOV,
536                                            sizeof(rc), &rc, NULL);
537                 }
538                 if (imp->imp_recon_bk) {
539                         CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
540                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
541                         spin_lock(&imp->imp_lock);
542                         imp->imp_last_recon = 1;
543                         spin_unlock(&imp->imp_lock);
544                 }
545         }
546
547         rc = sptlrpc_import_sec_adapt(imp, NULL, 0);
548         if (rc)
549                 GOTO(out, rc);
550
551         /* Reset connect flags to the originally requested flags, in case
552          * the server is updated on-the-fly we will get the new features. */
553         imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
554         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
555
556         rc = obd_reconnect(NULL, imp->imp_obd->obd_self_export, obd,
557                            &obd->obd_uuid, &imp->imp_connect_data, NULL);
558         if (rc)
559                 GOTO(out, rc);
560
561         request = ptlrpc_request_alloc(imp, &RQF_MDS_CONNECT);
562         if (request == NULL)
563                 GOTO(out, rc = -ENOMEM);
564
565         rc = ptlrpc_request_bufs_pack(request, LUSTRE_OBD_VERSION,
566                                       imp->imp_connect_op, bufs, NULL);
567         if (rc) {
568                 ptlrpc_request_free(request);
569                 GOTO(out, rc);
570         }
571
572 #ifndef __KERNEL__
573         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
574 #endif
575         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_NEXT_VER);
576
577         request->rq_send_state = LUSTRE_IMP_CONNECTING;
578         /* Allow a slightly larger reply for future growth compatibility */
579         req_capsule_set_size(&request->rq_pill, &RMF_CONNECT_DATA, RCL_SERVER,
580                              sizeof(struct obd_connect_data)+16*sizeof(__u64));
581         ptlrpc_request_set_replen(request);
582         request->rq_interpret_reply = ptlrpc_connect_interpret;
583
584         CLASSERT(sizeof (*aa) <= sizeof (request->rq_async_args));
585         aa = ptlrpc_req_async_args(request);
586         memset(aa, 0, sizeof *aa);
587
588         aa->pcaa_peer_committed = committed_before_reconnect;
589         aa->pcaa_initial_connect = initial_connect;
590
591         if (aa->pcaa_initial_connect) {
592                 spin_lock(&imp->imp_lock);
593                 imp->imp_replayable = 1;
594                 spin_unlock(&imp->imp_lock);
595                 lustre_msg_add_op_flags(request->rq_reqmsg,
596                                         MSG_CONNECT_INITIAL);
597                 if (AT_OFF)
598                         /* AT will use INITIAL_CONNECT_TIMEOUT the first
599                            time, adaptive after that. */
600                         request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
601         }
602
603         if (set_transno)
604                 lustre_msg_add_op_flags(request->rq_reqmsg,
605                                         MSG_CONNECT_TRANSNO);
606
607         DEBUG_REQ(D_RPCTRACE, request, "(re)connect request");
608         ptlrpcd_add_req(request);
609         rc = 0;
610 out:
611         if (rc != 0) {
612                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
613         }
614
615         RETURN(rc);
616 }
617 EXPORT_SYMBOL(ptlrpc_connect_import);
618
619 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
620 {
621 #ifdef __KERNEL__
622         struct obd_import_conn *imp_conn;
623 #endif
624         int wake_pinger = 0;
625
626         ENTRY;
627
628         spin_lock(&imp->imp_lock);
629         if (list_empty(&imp->imp_conn_list))
630                 GOTO(unlock, 0);
631
632 #ifdef __KERNEL__
633         imp_conn = list_entry(imp->imp_conn_list.prev,
634                               struct obd_import_conn,
635                               oic_item);
636
637         /* XXX: When the failover node is the primary node, it is possible
638          * to have two identical connections in imp_conn_list. We must
639          * compare not conn's pointers but NIDs, otherwise we can defeat
640          * connection throttling. (See bug 14774.) */
641         if (imp->imp_conn_current->oic_conn->c_self !=
642                                 imp_conn->oic_conn->c_self) {
643                 ptlrpc_ping_import_soon(imp);
644                 wake_pinger = 1;
645         }
646 #else
647         /* liblustre has no pinger thead, so we wakup pinger anyway */
648         wake_pinger = 1;
649 #endif
650
651  unlock:
652         spin_unlock(&imp->imp_lock);
653
654         if (wake_pinger)
655                 ptlrpc_pinger_wake_up();
656
657         EXIT;
658 }
659
660 static int ptlrpc_connect_interpret(const struct lu_env *env,
661                                     struct ptlrpc_request *request,
662                                     void * data, int rc)
663 {
664         struct ptlrpc_connect_async_args *aa = data;
665         struct obd_import *imp = request->rq_import;
666         struct client_obd *cli = &imp->imp_obd->u.cli;
667         struct lustre_handle old_hdl;
668         int msg_flags;
669         ENTRY;
670
671         spin_lock(&imp->imp_lock);
672         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
673                 spin_unlock(&imp->imp_lock);
674                 RETURN(0);
675         }
676         spin_unlock(&imp->imp_lock);
677
678         if (rc)
679                 GOTO(out, rc);
680
681         LASSERT(imp->imp_conn_current);
682
683         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
684
685         /* All imports are pingable */
686         spin_lock(&imp->imp_lock);
687         imp->imp_pingable = 1;
688
689         if (aa->pcaa_initial_connect) {
690                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
691                         imp->imp_replayable = 1;
692                         spin_unlock(&imp->imp_lock);
693                         CDEBUG(D_HA, "connected to replayable target: %s\n",
694                                obd2cli_tgt(imp->imp_obd));
695                 } else {
696                         imp->imp_replayable = 0;
697                         spin_unlock(&imp->imp_lock);
698                 }
699
700                 /* if applies, adjust the imp->imp_msg_magic here
701                  * according to reply flags */
702
703                 imp->imp_remote_handle =
704                                 *lustre_msg_get_handle(request->rq_repmsg);
705
706                 /* Initial connects are allowed for clients with non-random
707                  * uuids when servers are in recovery.  Simply signal the
708                  * servers replay is complete and wait in REPLAY_WAIT. */
709                 if (msg_flags & MSG_CONNECT_RECOVERING) {
710                         CDEBUG(D_HA, "connect to %s during recovery\n",
711                                obd2cli_tgt(imp->imp_obd));
712                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
713                 } else {
714                         IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
715                 }
716
717                 spin_lock(&imp->imp_lock);
718                 if (imp->imp_invalid) {
719                         spin_unlock(&imp->imp_lock);
720                         ptlrpc_activate_import(imp);
721                 } else {
722                         spin_unlock(&imp->imp_lock);
723                 }
724
725                 GOTO(finish, rc = 0);
726         } else {
727                 spin_unlock(&imp->imp_lock);
728         }
729
730         /* Determine what recovery state to move the import to. */
731         if (MSG_CONNECT_RECONNECT & msg_flags) {
732                 memset(&old_hdl, 0, sizeof(old_hdl));
733                 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
734                             sizeof (old_hdl))) {
735                         CERROR("%s@%s didn't like our handle "LPX64
736                                ", failed\n", obd2cli_tgt(imp->imp_obd),
737                                imp->imp_connection->c_remote_uuid.uuid,
738                                imp->imp_dlm_handle.cookie);
739                         GOTO(out, rc = -ENOTCONN);
740                 }
741
742                 if (memcmp(&imp->imp_remote_handle,
743                            lustre_msg_get_handle(request->rq_repmsg),
744                            sizeof(imp->imp_remote_handle))) {
745                         int level = msg_flags & MSG_CONNECT_RECOVERING ? D_HA :
746                                                                          D_WARNING;
747
748                         /* Bug 16611/14775: if server handle have changed,
749                          * that means some sort of disconnection happened.
750                          * If the server is not in recovery, that also means it
751                          * already erased all of our state because of previous
752                          * eviction. If it is in recovery - we are safe to
753                          * participate since we can reestablish all of our state
754                          * with server again */
755                         CDEBUG(level,"%s@%s changed server handle from "
756                                      LPX64" to "LPX64"%s \n" "but is still in recovery \n",
757                                      obd2cli_tgt(imp->imp_obd),
758                                      imp->imp_connection->c_remote_uuid.uuid,
759                                      imp->imp_remote_handle.cookie,
760                                      lustre_msg_get_handle(request->rq_repmsg)->
761                                                                         cookie,
762                                      (MSG_CONNECT_RECOVERING & msg_flags) ?
763                                          "but is still in recovery" : "");
764
765                         imp->imp_remote_handle =
766                                      *lustre_msg_get_handle(request->rq_repmsg);
767
768                         if (!(MSG_CONNECT_RECOVERING & msg_flags)) {
769                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
770                                 GOTO(finish, rc = 0);
771                         }
772
773                 } else {
774                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
775                                obd2cli_tgt(imp->imp_obd),
776                                imp->imp_connection->c_remote_uuid.uuid);
777                 }
778
779                 if (imp->imp_invalid) {
780                         CDEBUG(D_HA, "%s: reconnected but import is invalid; "
781                                "marking evicted\n", imp->imp_obd->obd_name);
782                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
783                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
784                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
785                                imp->imp_obd->obd_name,
786                                obd2cli_tgt(imp->imp_obd));
787
788                         spin_lock(&imp->imp_lock);
789                         imp->imp_resend_replay = 1;
790                         spin_unlock(&imp->imp_lock);
791
792                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
793                 } else {
794                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
795                 }
796         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
797                 LASSERT(imp->imp_replayable);
798                 imp->imp_remote_handle =
799                                 *lustre_msg_get_handle(request->rq_repmsg);
800                 imp->imp_last_replay_transno = 0;
801                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
802         } else {
803                 DEBUG_REQ(D_HA, request, "%s: evicting (reconnect/recover flags"
804                           " not set: %x)", imp->imp_obd->obd_name, msg_flags);
805                 imp->imp_remote_handle =
806                                 *lustre_msg_get_handle(request->rq_repmsg);
807                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
808         }
809
810         /* Sanity checks for a reconnected import. */
811         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
812                 CERROR("imp_replayable flag does not match server "
813                        "after reconnect. We should LBUG right here.\n");
814         }
815
816         if (lustre_msg_get_last_committed(request->rq_repmsg) <
817             aa->pcaa_peer_committed) {
818                 CERROR("%s went back in time (transno "LPD64
819                        " was previously committed, server now claims "LPD64
820                        ")!  See https://bugzilla.lustre.org/show_bug.cgi?"
821                        "id=9646\n",
822                        obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
823                        lustre_msg_get_last_committed(request->rq_repmsg));
824         }
825
826 finish:
827         rc = ptlrpc_import_recovery_state_machine(imp);
828         if (rc != 0) {
829                 if (rc == -ENOTCONN) {
830                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
831                                "invalidating and reconnecting\n",
832                                obd2cli_tgt(imp->imp_obd),
833                                imp->imp_connection->c_remote_uuid.uuid);
834                         ptlrpc_connect_import(imp, NULL);
835                         RETURN(0);
836                 }
837         } else {
838                 struct obd_connect_data *ocd;
839                 struct obd_export *exp;
840                 int ret;
841                 ret = req_capsule_get_size(&request->rq_pill, &RMF_CONNECT_DATA,
842                                            RCL_SERVER);
843                 /* server replied obd_connect_data is always bigger */
844                 ocd = req_capsule_server_sized_get(&request->rq_pill,
845                                                    &RMF_CONNECT_DATA, ret);
846
847                 spin_lock(&imp->imp_lock);
848                 list_del(&imp->imp_conn_current->oic_item);
849                 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
850                 imp->imp_last_success_conn =
851                         imp->imp_conn_current->oic_last_attempt;
852
853                 if (ocd == NULL) {
854                         spin_unlock(&imp->imp_lock);
855                         CERROR("Wrong connect data from server\n");
856                         rc = -EPROTO;
857                         GOTO(out, rc);
858                 }
859
860                 imp->imp_connect_data = *ocd;
861
862                 exp = class_conn2export(&imp->imp_dlm_handle);
863                 spin_unlock(&imp->imp_lock);
864
865                 /* check that server granted subset of flags we asked for. */
866                 LASSERTF((ocd->ocd_connect_flags &
867                           imp->imp_connect_flags_orig) ==
868                          ocd->ocd_connect_flags, LPX64" != "LPX64,
869                          imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
870
871                 if (!exp) {
872                         /* This could happen if export is cleaned during the
873                            connect attempt */
874                         CERROR("Missing export for %s\n",
875                                imp->imp_obd->obd_name);
876                         GOTO(out, rc = -ENODEV);
877                 }
878                 exp->exp_connect_flags = ocd->ocd_connect_flags;
879                 imp->imp_obd->obd_self_export->exp_connect_flags =
880                                                         ocd->ocd_connect_flags;
881                 class_export_put(exp);
882
883                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
884
885                 if (!ocd->ocd_ibits_known &&
886                     ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
887                         CERROR("Inodebits aware server returned zero compatible"
888                                " bits?\n");
889
890                 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
891                     (ocd->ocd_version > LUSTRE_VERSION_CODE +
892                                         LUSTRE_VERSION_OFFSET_WARN ||
893                      ocd->ocd_version < LUSTRE_VERSION_CODE -
894                                         LUSTRE_VERSION_OFFSET_WARN)) {
895                         /* Sigh, some compilers do not like #ifdef in the middle
896                            of macro arguments */
897 #ifdef __KERNEL__
898                         const char *older =
899                                 "older. Consider upgrading this client";
900 #else
901                         const char *older =
902                                 "older. Consider recompiling this application";
903 #endif
904                         const char *newer = "newer than client version";
905
906                         LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
907                                       "is much %s (%s)\n",
908                                       obd2cli_tgt(imp->imp_obd),
909                                       OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
910                                       OBD_OCD_VERSION_MINOR(ocd->ocd_version),
911                                       OBD_OCD_VERSION_PATCH(ocd->ocd_version),
912                                       OBD_OCD_VERSION_FIX(ocd->ocd_version),
913                                       ocd->ocd_version > LUSTRE_VERSION_CODE ?
914                                       newer : older, LUSTRE_VERSION_STRING);
915                 }
916
917                 if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
918                         /* We sent to the server ocd_cksum_types with bits set
919                          * for algorithms we understand. The server masked off
920                          * the checksum types it doesn't support */
921                         if ((ocd->ocd_cksum_types & OBD_CKSUM_ALL) == 0) {
922                                 LCONSOLE_WARN("The negotiation of the checksum "
923                                               "alogrithm to use with server %s "
924                                               "failed (%x/%x), disabling "
925                                               "checksums\n",
926                                               obd2cli_tgt(imp->imp_obd),
927                                               ocd->ocd_cksum_types,
928                                               OBD_CKSUM_ALL);
929                                 cli->cl_checksum = 0;
930                                 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
931                                 cli->cl_cksum_type = OBD_CKSUM_CRC32;
932                         } else {
933                                 cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
934
935                                 if (ocd->ocd_cksum_types & OSC_DEFAULT_CKSUM)
936                                         cli->cl_cksum_type = OSC_DEFAULT_CKSUM;
937                                 else if (ocd->ocd_cksum_types & OBD_CKSUM_ADLER)
938                                         cli->cl_cksum_type = OBD_CKSUM_ADLER;
939                                 else
940                                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
941                         }
942                 } else {
943                         /* The server does not support OBD_CONNECT_CKSUM.
944                          * Enforce CRC32 for backward compatibility*/
945                         cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
946                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
947                 }
948
949                 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
950                         cli->cl_max_pages_per_rpc =
951                                 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
952                 }
953
954                 imp->imp_obd->obd_namespace->ns_connect_flags =
955                                                         ocd->ocd_connect_flags;
956                 imp->imp_obd->obd_namespace->ns_orig_connect_flags =
957                                                         ocd->ocd_connect_flags;
958
959                 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
960                     (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
961                         /* We need a per-message support flag, because
962                            a. we don't know if the incoming connect reply
963                               supports AT or not (in reply_in_callback)
964                               until we unpack it.
965                            b. failovered server means export and flags are gone
966                               (in ptlrpc_send_reply).
967                            Can only be set when we know AT is supported at
968                            both ends */
969                         imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
970                 else
971                         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
972
973                 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
974                         (cli->cl_max_pages_per_rpc > 0));
975         }
976
977 out:
978         if (rc != 0) {
979                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
980                 spin_lock(&imp->imp_lock);
981                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov &&
982                     (request->rq_import_generation == imp->imp_generation))
983                         ptlrpc_deactivate_and_unlock_import(imp);
984                 else
985                         spin_unlock(&imp->imp_lock);
986
987                 if ((imp->imp_recon_bk && imp->imp_last_recon) ||
988                     (rc == -EACCES)) {
989                         /*
990                          * Give up trying to reconnect
991                          * EACCES means client has no permission for connection
992                          */
993                         imp->imp_obd->obd_no_recov = 1;
994                         ptlrpc_deactivate_import(imp);
995                 }
996
997                 if (rc == -EPROTO) {
998                         struct obd_connect_data *ocd;
999
1000                         /* reply message might not be ready */
1001                         if (request->rq_repmsg == NULL)
1002                                 RETURN(-EPROTO);
1003
1004                         ocd = req_capsule_server_get(&request->rq_pill,
1005                                                      &RMF_CONNECT_DATA);
1006                         if (ocd &&
1007                             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1008                             (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
1009                            /* Actually servers are only supposed to refuse
1010                               connection from liblustre clients, so we should
1011                               never see this from VFS context */
1012                                 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
1013                                         "(%d.%d.%d.%d)"
1014                                         " refused connection from this client "
1015                                         "with an incompatible version (%s).  "
1016                                         "Client must be recompiled\n",
1017                                         obd2cli_tgt(imp->imp_obd),
1018                                         OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1019                                         OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1020                                         OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1021                                         OBD_OCD_VERSION_FIX(ocd->ocd_version),
1022                                         LUSTRE_VERSION_STRING);
1023                                 ptlrpc_deactivate_import(imp);
1024                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
1025                         }
1026                         RETURN(-EPROTO);
1027                 }
1028
1029                 ptlrpc_maybe_ping_import_soon(imp);
1030
1031                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
1032                        obd2cli_tgt(imp->imp_obd),
1033                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
1034         }
1035
1036         spin_lock(&imp->imp_lock);
1037         imp->imp_last_recon = 0;
1038         spin_unlock(&imp->imp_lock);
1039
1040         cfs_waitq_signal(&imp->imp_recovery_waitq);
1041         RETURN(rc);
1042 }
1043
1044 static int completed_replay_interpret(const struct lu_env *env,
1045                                       struct ptlrpc_request *req,
1046                                     void * data, int rc)
1047 {
1048         ENTRY;
1049         atomic_dec(&req->rq_import->imp_replay_inflight);
1050         if (req->rq_status == 0) {
1051                 ptlrpc_import_recovery_state_machine(req->rq_import);
1052         } else {
1053                 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
1054                        "reconnecting\n",
1055                        req->rq_import->imp_obd->obd_name, req->rq_status);
1056                 ptlrpc_connect_import(req->rq_import, NULL);
1057         }
1058
1059         RETURN(0);
1060 }
1061
1062 static int signal_completed_replay(struct obd_import *imp)
1063 {
1064         struct ptlrpc_request *req;
1065         ENTRY;
1066
1067         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
1068         atomic_inc(&imp->imp_replay_inflight);
1069
1070         req = ptlrpc_request_alloc_pack(imp, &RQF_OBD_PING, LUSTRE_OBD_VERSION,
1071                                         OBD_PING);
1072         if (req == NULL) {
1073                 atomic_dec(&imp->imp_replay_inflight);
1074                 RETURN(-ENOMEM);
1075         }
1076
1077         ptlrpc_request_set_replen(req);
1078         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
1079         lustre_msg_add_flags(req->rq_reqmsg,
1080                              MSG_LOCK_REPLAY_DONE | MSG_REQ_REPLAY_DONE);
1081         req->rq_timeout *= 3;
1082         req->rq_interpret_reply = completed_replay_interpret;
1083
1084         ptlrpcd_add_req(req);
1085         RETURN(0);
1086 }
1087
1088 #ifdef __KERNEL__
1089 static int ptlrpc_invalidate_import_thread(void *data)
1090 {
1091         struct obd_import *imp = data;
1092
1093         ENTRY;
1094
1095         ptlrpc_daemonize("ll_imp_inval");
1096
1097         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
1098                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
1099                imp->imp_connection->c_remote_uuid.uuid);
1100
1101         ptlrpc_invalidate_import(imp);
1102
1103         if (obd_dump_on_eviction) {
1104                 CERROR("dump the log upon eviction\n");
1105                 libcfs_debug_dumplog();
1106         }
1107
1108         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1109         ptlrpc_import_recovery_state_machine(imp);
1110
1111         RETURN(0);
1112 }
1113 #endif
1114
1115 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
1116 {
1117         int rc = 0;
1118         int inflight;
1119         char *target_start;
1120         int target_len;
1121
1122         ENTRY;
1123         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
1124                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1125                           &target_start, &target_len);
1126                 /* Don't care about MGC eviction */
1127                 if (strcmp(imp->imp_obd->obd_type->typ_name,
1128                            LUSTRE_MGC_NAME) != 0) {
1129                         LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
1130                                            "%.*s; in progress operations using "
1131                                            "this service will fail.\n",
1132                                            target_len, target_start);
1133                 }
1134                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
1135                        obd2cli_tgt(imp->imp_obd),
1136                        imp->imp_connection->c_remote_uuid.uuid);
1137
1138 #ifdef __KERNEL__
1139                 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
1140                                        CLONE_VM | CLONE_FILES);
1141                 if (rc < 0)
1142                         CERROR("error starting invalidate thread: %d\n", rc);
1143                 else
1144                         rc = 0;
1145                 RETURN(rc);
1146 #else
1147                 ptlrpc_invalidate_import(imp);
1148
1149                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1150 #endif
1151         }
1152
1153         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1154                 CDEBUG(D_HA, "replay requested by %s\n",
1155                        obd2cli_tgt(imp->imp_obd));
1156                 rc = ptlrpc_replay_next(imp, &inflight);
1157                 if (inflight == 0 &&
1158                     atomic_read(&imp->imp_replay_inflight) == 0) {
1159                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
1160                         rc = ldlm_replay_locks(imp);
1161                         if (rc)
1162                                 GOTO(out, rc);
1163                 }
1164                 rc = 0;
1165         }
1166
1167         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1168                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1169                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
1170                         rc = signal_completed_replay(imp);
1171                         if (rc)
1172                                 GOTO(out, rc);
1173                 }
1174
1175         }
1176
1177         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1178                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1179                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1180                 }
1181         }
1182
1183         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1184                 CDEBUG(D_HA, "reconnected to %s@%s\n",
1185                        obd2cli_tgt(imp->imp_obd),
1186                        imp->imp_connection->c_remote_uuid.uuid);
1187
1188                 rc = ptlrpc_resend(imp);
1189                 if (rc)
1190                         GOTO(out, rc);
1191                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1192                 ptlrpc_activate_import(imp);
1193
1194                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1195                           &target_start, &target_len);
1196                 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1197                               "using nid %s.\n", imp->imp_obd->obd_name,
1198                               target_len, target_start,
1199                               libcfs_nid2str(imp->imp_connection->c_peer.nid));
1200         }
1201
1202         if (imp->imp_state == LUSTRE_IMP_FULL) {
1203                 cfs_waitq_signal(&imp->imp_recovery_waitq);
1204                 ptlrpc_wake_delayed(imp);
1205         }
1206
1207 out:
1208         RETURN(rc);
1209 }
1210
1211 static int back_to_sleep(void *unused)
1212 {
1213         return 0;
1214 }
1215
1216 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1217 {
1218         struct ptlrpc_request *req;
1219         int rq_opc, rc = 0;
1220         int nowait = imp->imp_obd->obd_force;
1221         ENTRY;
1222
1223         if (nowait)
1224                 GOTO(set_state, rc);
1225
1226         switch (imp->imp_connect_op) {
1227         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1228         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1229         case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1230         default:
1231                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1232                        obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1233                 RETURN(-EINVAL);
1234         }
1235
1236         if (ptlrpc_import_in_recovery(imp)) {
1237                 struct l_wait_info lwi;
1238                 cfs_duration_t timeout;
1239
1240
1241                 if (AT_OFF) {
1242                         if (imp->imp_server_timeout)
1243                                 timeout = cfs_time_seconds(obd_timeout / 2);
1244                         else
1245                                 timeout = cfs_time_seconds(obd_timeout);
1246                 } else {
1247                         int idx = import_at_get_index(imp,
1248                                 imp->imp_client->cli_request_portal);
1249                         timeout = cfs_time_seconds(
1250                                 at_get(&imp->imp_at.iat_service_estimate[idx]));
1251                 }
1252
1253                 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout),
1254                                        back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1255                 rc = l_wait_event(imp->imp_recovery_waitq,
1256                                   !ptlrpc_import_in_recovery(imp), &lwi);
1257
1258         }
1259
1260         spin_lock(&imp->imp_lock);
1261         if (imp->imp_state != LUSTRE_IMP_FULL)
1262                 GOTO(out, 0);
1263
1264         spin_unlock(&imp->imp_lock);
1265
1266         req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_DISCONNECT,
1267                                         LUSTRE_OBD_VERSION, rq_opc);
1268         if (req) {
1269                 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1270                  * it fails.  We can get through the above with a down server
1271                  * if the client doesn't know the server is gone yet. */
1272                 req->rq_no_resend = 1;
1273
1274 #ifndef CRAY_XT3
1275                 /* We want client umounts to happen quickly, no matter the
1276                    server state... */
1277                 req->rq_timeout = min_t(int, req->rq_timeout,
1278                                         INITIAL_CONNECT_TIMEOUT);
1279 #else
1280                 /* ... but we always want liblustre clients to nicely
1281                    disconnect, so only use the adaptive value. */
1282                 if (AT_OFF)
1283                         req->rq_timeout = obd_timeout / 3;
1284 #endif
1285
1286                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1287                 req->rq_send_state =  LUSTRE_IMP_CONNECTING;
1288                 ptlrpc_request_set_replen(req);
1289                 rc = ptlrpc_queue_wait(req);
1290                 ptlrpc_req_finished(req);
1291         }
1292
1293 set_state:
1294         spin_lock(&imp->imp_lock);
1295 out:
1296         if (noclose)
1297                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1298         else
1299                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1300         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1301         imp->imp_conn_cnt = 0;
1302         /* Try all connections in the future - bz 12758 */
1303         imp->imp_last_recon = 0;
1304         spin_unlock(&imp->imp_lock);
1305
1306         RETURN(rc);
1307 }
1308
1309
1310 /* Adaptive Timeout utils */
1311 extern unsigned int at_min, at_max, at_history;
1312
1313 /* Bin into timeslices using AT_BINS bins.
1314    This gives us a max of the last binlimit*AT_BINS secs without the storage,
1315    but still smoothing out a return to normalcy from a slow response.
1316    (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1317 int at_add(struct adaptive_timeout *at, unsigned int val)
1318 {
1319         unsigned int old = at->at_current;
1320         time_t now = cfs_time_current_sec();
1321         time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
1322
1323         LASSERT(at);
1324 #if 0
1325         CDEBUG(D_INFO, "add %u to %p time=%lu v=%u (%u %u %u %u)\n",
1326                val, at, now - at->at_binstart, at->at_current,
1327                at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1328 #endif
1329         if (val == 0)
1330                 /* 0's don't count, because we never want our timeout to
1331                    drop to 0, and because 0 could mean an error */
1332                 return 0;
1333
1334         spin_lock(&at->at_lock);
1335
1336         if (unlikely(at->at_binstart == 0)) {
1337                 /* Special case to remove default from history */
1338                 at->at_current = val;
1339                 at->at_worst_ever = val;
1340                 at->at_worst_time = now;
1341                 at->at_hist[0] = val;
1342                 at->at_binstart = now;
1343         } else if (now - at->at_binstart < binlimit ) {
1344                 /* in bin 0 */
1345                 at->at_hist[0] = max(val, at->at_hist[0]);
1346                 at->at_current = max(val, at->at_current);
1347         } else {
1348                 int i, shift;
1349                 unsigned int maxv = val;
1350                 /* move bins over */
1351                 shift = (now - at->at_binstart) / binlimit;
1352                 LASSERT(shift > 0);
1353                 for(i = AT_BINS - 1; i >= 0; i--) {
1354                         if (i >= shift) {
1355                                 at->at_hist[i] = at->at_hist[i - shift];
1356                                 maxv = max(maxv, at->at_hist[i]);
1357                         } else {
1358                                 at->at_hist[i] = 0;
1359                         }
1360                 }
1361                 at->at_hist[0] = val;
1362                 at->at_current = maxv;
1363                 at->at_binstart += shift * binlimit;
1364         }
1365
1366         if (at->at_current > at->at_worst_ever) {
1367                 at->at_worst_ever = at->at_current;
1368                 at->at_worst_time = now;
1369         }
1370
1371         if (at->at_flags & AT_FLG_NOHIST)
1372                 /* Only keep last reported val; keeping the rest of the history
1373                    for proc only */
1374                 at->at_current = val;
1375
1376         if (at_max > 0)
1377                 at->at_current =  min(at->at_current, at_max);
1378         at->at_current =  max(at->at_current, at_min);
1379
1380 #if 0
1381         if (at->at_current != old)
1382                 CDEBUG(D_ADAPTTO, "AT %p change: old=%u new=%u delta=%d "
1383                        "(val=%u) hist %u %u %u %u\n", at,
1384                        old, at->at_current, at->at_current - old, val,
1385                        at->at_hist[0], at->at_hist[1], at->at_hist[2],
1386                        at->at_hist[3]);
1387 #endif
1388
1389         /* if we changed, report the old value */
1390         old = (at->at_current != old) ? old : 0;
1391
1392         spin_unlock(&at->at_lock);
1393         return old;
1394 }
1395
1396 /* Find the imp_at index for a given portal; assign if space available */
1397 int import_at_get_index(struct obd_import *imp, int portal)
1398 {
1399         struct imp_at *at = &imp->imp_at;
1400         int i;
1401
1402         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1403                 if (at->iat_portal[i] == portal)
1404                         return i;
1405                 if (at->iat_portal[i] == 0)
1406                         /* unused */
1407                         break;
1408         }
1409
1410         /* Not found in list, add it under a lock */
1411         spin_lock(&imp->imp_lock);
1412
1413         /* Check unused under lock */
1414         for (; i < IMP_AT_MAX_PORTALS; i++) {
1415                 if (at->iat_portal[i] == portal)
1416                         goto out;
1417                 if (at->iat_portal[i] == 0)
1418                         /* unused */
1419                         break;
1420         }
1421
1422         /* Not enough portals? */
1423         LASSERT(i < IMP_AT_MAX_PORTALS);
1424
1425         at->iat_portal[i] = portal;
1426 out:
1427         spin_unlock(&imp->imp_lock);
1428         return i;
1429 }