Whamcloud - gitweb
b=21574 more pinger fixes
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/import.c
37  *
38  * Author: Mike Shaver <shaver@clusterfs.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #ifndef __KERNEL__
43 # include <liblustre.h>
44 #endif
45
46 #include <obd_support.h>
47 #include <lustre_ha.h>
48 #include <lustre_net.h>
49 #include <lustre_import.h>
50 #include <lustre_export.h>
51 #include <obd.h>
52 #include <obd_class.h>
53
54 #include "ptlrpc_internal.h"
55
56 struct ptlrpc_connect_async_args {
57          __u64 pcaa_peer_committed;
58         int pcaa_initial_connect;
59 };
60
61 static void __import_set_state(struct obd_import *imp,
62                                enum lustre_imp_state state)
63 {
64         imp->imp_state = state;
65         imp->imp_state_hist[imp->imp_state_hist_idx].ish_state = state;
66         imp->imp_state_hist[imp->imp_state_hist_idx].ish_time =
67                 cfs_time_current_sec();
68         imp->imp_state_hist_idx = (imp->imp_state_hist_idx + 1) %
69                 IMP_STATE_HIST_LEN;
70 }
71
72 /* A CLOSED import should remain so. */
73 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
74 do {                                                                           \
75         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
76                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
77                       imp, obd2cli_tgt(imp->imp_obd),                          \
78                       ptlrpc_import_state_name(imp->imp_state),                \
79                       ptlrpc_import_state_name(state));                        \
80                __import_set_state(imp, state);                                 \
81         }                                                                      \
82 } while(0)
83
84 #define IMPORT_SET_STATE(imp, state)            \
85 do {                                            \
86         spin_lock(&imp->imp_lock);              \
87         IMPORT_SET_STATE_NOLOCK(imp, state);    \
88         spin_unlock(&imp->imp_lock);            \
89 } while(0)
90
91
92 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
93                                     void * data, int rc);
94 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
95
96 /* Only this function is allowed to change the import state when it is
97  * CLOSED. I would rather refcount the import and free it after
98  * disconnection like we do with exports. To do that, the client_obd
99  * will need to save the peer info somewhere other than in the import,
100  * though. */
101 int ptlrpc_init_import(struct obd_import *imp)
102 {
103         spin_lock(&imp->imp_lock);
104
105         imp->imp_generation++;
106         imp->imp_state =  LUSTRE_IMP_NEW;
107
108         spin_unlock(&imp->imp_lock);
109
110         return 0;
111 }
112 EXPORT_SYMBOL(ptlrpc_init_import);
113
114 #define UUID_STR "_UUID"
115 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
116                       int *uuid_len)
117 {
118         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
119                 ? uuid : uuid + strlen(prefix);
120
121         *uuid_len = strlen(*uuid_start);
122
123         if (*uuid_len < strlen(UUID_STR))
124                 return;
125
126         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
127                     UUID_STR, strlen(UUID_STR)))
128                 *uuid_len -= strlen(UUID_STR);
129 }
130
131 /* Returns true if import was FULL, false if import was already not
132  * connected.
133  * @imp - import to be disconnected
134  * @conn_cnt - connection count (epoch) of the request that timed out
135  *             and caused the disconnection.  In some cases, multiple
136  *             inflight requests can fail to a single target (e.g. OST
137  *             bulk requests) and if one has already caused a reconnection
138  *             (increasing the import->conn_cnt) the older failure should
139  *             not also cause a reconnection.  If zero it forces a reconnect.
140  */
141 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
142 {
143         int rc = 0;
144
145         spin_lock(&imp->imp_lock);
146
147         if (imp->imp_state == LUSTRE_IMP_FULL &&
148             (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
149                 char *target_start;
150                 int   target_len;
151
152                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
153                           &target_start, &target_len);
154                 if (imp->imp_replayable) {
155                         LCONSOLE_WARN("%s: Connection to service %.*s via nid "
156                                "%s was lost; in progress operations using this "
157                                "service will wait for recovery to complete.\n",
158                                imp->imp_obd->obd_name, target_len, target_start,
159                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
160                 } else {
161                         LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
162                                "%.*s via nid %s was lost; in progress "
163                                "operations using this service will fail.\n",
164                                imp->imp_obd->obd_name, target_len, target_start,
165                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
166                 }
167                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
168                 spin_unlock(&imp->imp_lock);
169
170                 if (obd_dump_on_timeout)
171                         libcfs_debug_dumplog();
172
173                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
174                 rc = 1;
175         } else {
176                 spin_unlock(&imp->imp_lock);
177                 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
178                        imp->imp_client->cli_name, imp,
179                        (imp->imp_state == LUSTRE_IMP_FULL &&
180                         imp->imp_conn_cnt > conn_cnt) ?
181                        "reconnected" : "not connected", imp->imp_conn_cnt,
182                        conn_cnt, ptlrpc_import_state_name(imp->imp_state));
183         }
184
185         return rc;
186 }
187
188 /* Must be called with imp_lock held! */
189 static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
190 {
191         ENTRY;
192         LASSERT_SPIN_LOCKED(&imp->imp_lock);
193
194         CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
195         imp->imp_invalid = 1;
196         imp->imp_generation++;
197         spin_unlock(&imp->imp_lock);
198
199         ptlrpc_abort_inflight(imp);
200         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
201 }
202
203 /*
204  * This acts as a barrier; all existing requests are rejected, and
205  * no new requests will be accepted until the import is valid again.
206  */
207 void ptlrpc_deactivate_import(struct obd_import *imp)
208 {
209         spin_lock(&imp->imp_lock);
210         ptlrpc_deactivate_and_unlock_import(imp);
211 }
212
213 static unsigned int
214 ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
215 {
216         long dl;
217
218         if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
219               (req->rq_phase == RQ_PHASE_BULK) ||
220               (req->rq_phase == RQ_PHASE_NEW)))
221                 return 0;
222
223         if (req->rq_timedout)
224                 return 0;
225
226         if (req->rq_phase == RQ_PHASE_NEW)
227                 dl = req->rq_sent;
228         else
229                 dl = req->rq_deadline;
230
231         if (dl <= now)
232                 return 0;
233
234         return dl - now;
235 }
236
237 static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
238 {
239         time_t now = cfs_time_current_sec();
240         struct list_head *tmp, *n;
241         struct ptlrpc_request *req;
242         unsigned int timeout = 0;
243
244         spin_lock(&imp->imp_lock);
245         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
246                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
247                 timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
248         }
249         spin_unlock(&imp->imp_lock);
250         return timeout;
251 }
252
253 /*
254  * This function will invalidate the import, if necessary, then block
255  * for all the RPC completions, and finally notify the obd to
256  * invalidate its state (ie cancel locks, clear pending requests,
257  * etc).
258  */
259 void ptlrpc_invalidate_import(struct obd_import *imp)
260 {
261         struct list_head *tmp, *n;
262         struct ptlrpc_request *req;
263         struct l_wait_info lwi;
264         unsigned int timeout;
265         int rc;
266
267         atomic_inc(&imp->imp_inval_count);
268
269         /*
270          * If this is an invalid MGC connection, then don't bother
271          * waiting for imp_inflight to drop to 0.
272          */
273         if (imp->imp_invalid && imp->imp_recon_bk &&!imp->imp_obd->obd_no_recov)
274                 goto out;
275
276         if (!imp->imp_invalid || imp->imp_obd->obd_no_recov)
277                 ptlrpc_deactivate_import(imp);
278
279         LASSERT(imp->imp_invalid);
280
281         /* Wait forever until inflight == 0. We really can't do it another
282          * way because in some cases we need to wait for very long reply
283          * unlink. We can't do anything before that because there is really
284          * no guarantee that some rdma transfer is not in progress right now. */
285         do {
286                 /* Calculate max timeout for waiting on rpcs to error
287                  * out. Use obd_timeout if calculated value is smaller
288                  * than it. */
289                 if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
290                         timeout = ptlrpc_inflight_timeout(imp);
291                         timeout += timeout / 3;
292
293                         if (timeout == 0)
294                                 timeout = obd_timeout;
295                 } else {
296                         /* decrease the interval to increase race condition */
297                         timeout = 1;
298                 }
299
300                 CDEBUG(D_RPCTRACE,"Sleeping %d sec for inflight to error out\n",
301                        timeout);
302
303                 /* Wait for all requests to error out and call completion
304                  * callbacks. Cap it at obd_timeout -- these should all
305                  * have been locally cancelled by ptlrpc_abort_inflight. */
306                 lwi = LWI_TIMEOUT_INTERVAL(
307                         cfs_timeout_cap(cfs_time_seconds(timeout)),
308                         (timeout > 1)?cfs_time_seconds(1):cfs_time_seconds(1)/2,
309                         NULL, NULL);
310                 rc = l_wait_event(imp->imp_recovery_waitq,
311                                 (atomic_read(&imp->imp_inflight) == 0), &lwi);
312                 if (rc) {
313                         const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
314
315                         CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
316                                cli_tgt, rc, atomic_read(&imp->imp_inflight));
317
318                         spin_lock(&imp->imp_lock);
319                         if (atomic_read(&imp->imp_inflight) == 0) {
320                                 int count = atomic_read(&imp->imp_unregistering);
321
322                                 /* We know that "unregistering" rpcs only can
323                                  * survive in sending or delaying lists (they
324                                  * maybe waiting for long reply unlink in
325                                  * sluggish nets). Let's check this. If there
326                                  * is no inflight and unregistering != 0, this
327                                  * is bug. */
328                                 LASSERTF(count == 0, "Some RPCs are still "
329                                          "unregistering: %d\n", count);
330
331                                 /* Let's save one loop as soon as inflight have
332                                  * dropped to zero. No new inflights possible at
333                                  * this point. */
334                                 rc = 0;
335                         } else {
336                                 list_for_each_safe(tmp, n,
337                                                    &imp->imp_sending_list) {
338                                         req = list_entry(tmp,
339                                                          struct ptlrpc_request,
340                                                          rq_list);
341                                         DEBUG_REQ(D_ERROR, req,
342                                                   "still on sending list");
343                                 }
344                                 list_for_each_safe(tmp, n,
345                                                    &imp->imp_delayed_list) {
346                                         req = list_entry(tmp,
347                                                          struct ptlrpc_request,
348                                                          rq_list);
349                                         DEBUG_REQ(D_ERROR, req,
350                                                   "still on delayed list");
351                                 }
352
353                                 CERROR("%s: RPCs in \"%s\" phase found (%d). "
354                                        "Network is sluggish? Waiting them "
355                                        "to error out.\n", cli_tgt,
356                                        ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
357                                        atomic_read(&imp->imp_unregistering));
358                         }
359                         spin_unlock(&imp->imp_lock);
360                 }
361         } while (rc != 0);
362
363         /* Let's additionally check that no new rpcs added to import in
364          * "invalidate" state. */
365         LASSERT(atomic_read(&imp->imp_inflight) == 0);
366
367 out:
368         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
369
370         atomic_dec(&imp->imp_inval_count);
371         cfs_waitq_broadcast(&imp->imp_recovery_waitq);
372 }
373
374 /* unset imp_invalid */
375 void ptlrpc_activate_import(struct obd_import *imp)
376 {
377         struct obd_device *obd = imp->imp_obd;
378
379         spin_lock(&imp->imp_lock);
380         imp->imp_invalid = 0;
381         spin_unlock(&imp->imp_lock);
382
383         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
384 }
385
386 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
387 {
388         ENTRY;
389
390         LASSERT(!imp->imp_dlm_fake);
391
392         if (ptlrpc_set_import_discon(imp, conn_cnt)) {
393                 if (!imp->imp_replayable) {
394                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
395                                "auto-deactivating\n",
396                                obd2cli_tgt(imp->imp_obd),
397                                imp->imp_connection->c_remote_uuid.uuid,
398                                imp->imp_obd->obd_name);
399                         ptlrpc_deactivate_import(imp);
400                 }
401
402                 CDEBUG(D_HA, "%s: waking up pinger\n",
403                        obd2cli_tgt(imp->imp_obd));
404
405                 spin_lock(&imp->imp_lock);
406                 imp->imp_force_verify = 1;
407                 spin_unlock(&imp->imp_lock);
408
409                 ptlrpc_pinger_wake_up();
410         }
411         EXIT;
412 }
413
414 int ptlrpc_reconnect_import(struct obd_import *imp)
415 {
416
417         ptlrpc_set_import_discon(imp, 0);
418         /* Force a new connect attempt */
419         ptlrpc_invalidate_import(imp);
420         /* Do a fresh connect next time by zeroing the handle */
421         ptlrpc_disconnect_import(imp, 1);
422         /* Wait for all invalidate calls to finish */
423         if (atomic_read(&imp->imp_inval_count) > 0) {
424                 int rc;
425                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
426                 rc = l_wait_event(imp->imp_recovery_waitq,
427                                   (atomic_read(&imp->imp_inval_count) == 0),
428                                   &lwi);
429                 if (rc)
430                         CERROR("Interrupted, inval=%d\n",
431                                atomic_read(&imp->imp_inval_count));
432         }
433
434         /*
435          * Allow reconnect attempts. Note: Currently, the function is
436          * only called by MGC. So assume this is a recoverable import,
437          * and force import to be recoverable. fix this if you need to
438          */
439
440         imp->imp_obd->obd_no_recov = 0;
441         /* Remove 'invalid' flag */
442         ptlrpc_activate_import(imp);
443         /* Attempt a new connect */
444         ptlrpc_recover_import(imp, NULL);
445         return 0;
446 }
447
448 EXPORT_SYMBOL(ptlrpc_reconnect_import);
449
450 static int import_select_connection(struct obd_import *imp)
451 {
452         struct obd_import_conn *imp_conn = NULL, *conn;
453         struct obd_export *dlmexp;
454         int tried_all = 1;
455         ENTRY;
456
457         spin_lock(&imp->imp_lock);
458
459         if (list_empty(&imp->imp_conn_list)) {
460                 CERROR("%s: no connections available\n",
461                         imp->imp_obd->obd_name);
462                 spin_unlock(&imp->imp_lock);
463                 RETURN(-EINVAL);
464         }
465
466         list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
467                 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
468                        imp->imp_obd->obd_name,
469                        libcfs_nid2str(conn->oic_conn->c_peer.nid),
470                        conn->oic_last_attempt);
471
472                 /* Don't thrash connections */
473                 if (cfs_time_before_64(cfs_time_current_64(),
474                                      conn->oic_last_attempt +
475                                      cfs_time_seconds(CONNECTION_SWITCH_MIN))) {
476                         continue;
477                 }
478
479                 /* If we have not tried this connection since the
480                    the last successful attempt, go with this one */
481                 if ((conn->oic_last_attempt == 0) ||
482                     cfs_time_beforeq_64(conn->oic_last_attempt,
483                                        imp->imp_last_success_conn)) {
484                         imp_conn = conn;
485                         tried_all = 0;
486                         break;
487                 }
488
489                 /* If all of the connections have already been tried
490                    since the last successful connection; just choose the
491                    least recently used */
492                 if (!imp_conn)
493                         imp_conn = conn;
494                 else if (cfs_time_before_64(conn->oic_last_attempt,
495                                             imp_conn->oic_last_attempt))
496                         imp_conn = conn;
497         }
498
499         /* if not found, simply choose the current one */
500         if (!imp_conn || imp->imp_force_reconnect) {
501                 LASSERT(imp->imp_conn_current);
502                 imp_conn = imp->imp_conn_current;
503                 tried_all = 0;
504         }
505         LASSERT(imp_conn->oic_conn);
506
507         /* If we've tried everything, and we're back to the beginning of the
508            list, increase our timeout and try again. It will be reset when
509            we do finally connect. (FIXME: really we should wait for all network
510            state associated with the last connection attempt to drain before
511            trying to reconnect on it.) */
512         if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item) &&
513             !imp->imp_recon_bk /* not retrying */) {
514                 if (at_get(&imp->imp_at.iat_net_latency) <
515                     CONNECTION_SWITCH_MAX) {
516                         at_measured(&imp->imp_at.iat_net_latency,
517                                     MIN(at_get(&imp->imp_at.iat_net_latency) +
518                                     CONNECTION_SWITCH_INC,
519                                     CONNECTION_SWITCH_MAX));
520                 }
521                 LASSERT(imp_conn->oic_last_attempt);
522                 CWARN("%s: tried all connections, increasing latency to %ds\n",
523                       imp->imp_obd->obd_name,
524                       at_get(&imp->imp_at.iat_net_latency));
525         }
526
527         imp_conn->oic_last_attempt = cfs_time_current_64();
528
529         /* switch connection, don't mind if it's same as the current one */
530         if (imp->imp_connection)
531                 ptlrpc_connection_put(imp->imp_connection);
532         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
533
534         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
535         LASSERT(dlmexp != NULL);
536         if (dlmexp->exp_connection)
537                 ptlrpc_connection_put(dlmexp->exp_connection);
538         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
539         class_export_put(dlmexp);
540
541         if (imp->imp_conn_current != imp_conn) {
542                 if (imp->imp_conn_current)
543                         CDEBUG(D_HA, "Changing connection for %s to %s/%s\n",
544                                imp->imp_obd->obd_name, imp_conn->oic_uuid.uuid,
545                                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
546                 imp->imp_conn_current = imp_conn;
547         }
548
549         CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
550                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
551                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
552
553         spin_unlock(&imp->imp_lock);
554
555         RETURN(0);
556 }
557
558 /**
559  * must be called under imp lock
560  */
561 static int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
562 {
563         struct ptlrpc_request *req;
564         struct list_head *tmp;
565
566         if (list_empty(&imp->imp_replay_list))
567                 return 0;
568         tmp = imp->imp_replay_list.next;
569         req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
570         *transno = req->rq_transno;
571         if (req->rq_transno == 0) {
572                 DEBUG_REQ(D_ERROR, req, "zero transno in replay");
573                 LBUG();
574         }
575
576         return 1;
577 }
578
579 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
580 {
581         struct obd_device *obd = imp->imp_obd;
582         int set_transno = 0;
583         int initial_connect = 0;
584         int rc;
585         __u64 committed_before_reconnect = 0;
586         struct ptlrpc_request *request;
587         __u32 size[] = { sizeof(struct ptlrpc_body),
588                        sizeof(imp->imp_obd->u.cli.cl_target_uuid),
589                        sizeof(obd->obd_uuid),
590                        sizeof(imp->imp_dlm_handle),
591                        sizeof(imp->imp_connect_data) };
592         char *tmp[] = { NULL,
593                         obd2cli_tgt(imp->imp_obd),
594                         obd->obd_uuid.uuid,
595                         (char *)&imp->imp_dlm_handle,
596                         (char *)&imp->imp_connect_data };
597         struct ptlrpc_connect_async_args *aa;
598
599         ENTRY;
600         spin_lock(&imp->imp_lock);
601         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
602                 spin_unlock(&imp->imp_lock);
603                 CERROR("can't connect to a closed import\n");
604                 RETURN(-EINVAL);
605         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
606                 spin_unlock(&imp->imp_lock);
607                 CERROR("already connected\n");
608                 RETURN(0);
609         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
610                 spin_unlock(&imp->imp_lock);
611                 CERROR("already connecting\n");
612                 RETURN(-EALREADY);
613         }
614
615         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
616
617         imp->imp_conn_cnt++;
618         imp->imp_resend_replay = 0;
619
620         if (!lustre_handle_is_used(&imp->imp_remote_handle))
621                 initial_connect = 1;
622         else
623                 committed_before_reconnect = imp->imp_peer_committed_transno;
624
625         set_transno = ptlrpc_first_transno(imp,
626                                            &imp->imp_connect_data.ocd_transno);
627
628         spin_unlock(&imp->imp_lock);
629
630         if (new_uuid) {
631                 struct obd_uuid uuid;
632
633                 obd_str2uuid(&uuid, new_uuid);
634                 rc = import_set_conn_priority(imp, &uuid);
635                 if (rc)
636                         GOTO(out, rc);
637         }
638
639         rc = import_select_connection(imp);
640         if (rc)
641                 GOTO(out, rc);
642
643         /* last in connection list */
644         if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
645                 if (imp->imp_initial_recov_bk && initial_connect) {
646                         CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
647                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
648                         /* Don't retry if connect fails */
649                         rc = 0;
650                         obd_set_info_async(obd->obd_self_export,
651                                            sizeof(KEY_INIT_RECOV),
652                                            KEY_INIT_RECOV,
653                                            sizeof(rc), &rc, NULL);
654                 }
655                 if (imp->imp_recon_bk) {
656                         CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
657                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
658                         spin_lock(&imp->imp_lock);
659                         imp->imp_last_recon = 1;
660                         spin_unlock(&imp->imp_lock);
661                 }
662         }
663
664         /* Reset connect flags to the originally requested flags, in case
665          * the server is updated on-the-fly we will get the new features. */
666         imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
667         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
668
669         rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
670                            &obd->obd_uuid, &imp->imp_connect_data, NULL);
671         if (rc)
672                 GOTO(out, rc);
673
674         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
675                                   5, size, tmp);
676         if (!request)
677                 GOTO(out, rc = -ENOMEM);
678
679         /* Report the rpc service time to the server so that it knows how long
680          * to wait for clients to join recovery */
681         lustre_msg_set_service_time(request->rq_reqmsg,
682                                     at_timeout2est(request->rq_timeout));
683
684         /* The amount of time we give the server to process the connect req.
685          * import_select_connection will increase the net latency on
686          * repeated reconnect attempts to cover slow networks.
687          * We override/ignore the server rpc completion estimate here,
688          * which may be large if this is a reconnect attempt */
689         request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
690         lustre_msg_set_timeout(request->rq_reqmsg, request->rq_timeout);
691
692 #ifndef __KERNEL__
693         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
694 #endif
695         if (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V1)
696                 lustre_msg_add_op_flags(request->rq_reqmsg,
697                                         MSG_CONNECT_NEXT_VER);
698
699         request->rq_no_resend = request->rq_no_delay = 1;
700         request->rq_send_state = LUSTRE_IMP_CONNECTING;
701         /* Allow a slightly larger reply for future growth compatibility */
702         size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
703                               16 * sizeof(__u64);
704         ptlrpc_req_set_repsize(request, 2, size);
705         request->rq_interpret_reply = ptlrpc_connect_interpret;
706
707         CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
708         aa = ptlrpc_req_async_args(request);
709         memset(aa, 0, sizeof *aa);
710
711         aa->pcaa_peer_committed = committed_before_reconnect;
712         aa->pcaa_initial_connect = initial_connect;
713         if (aa->pcaa_initial_connect) {
714                 spin_lock(&imp->imp_lock);
715                 imp->imp_replayable = 1;
716                 spin_unlock(&imp->imp_lock);
717                 lustre_msg_add_op_flags(request->rq_reqmsg,
718                                         MSG_CONNECT_INITIAL);
719         }
720
721         if (set_transno)
722                 lustre_msg_add_op_flags(request->rq_reqmsg,
723                                         MSG_CONNECT_TRANSNO);
724
725         DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
726                   aa->pcaa_initial_connect ? "initial " : "re",
727                   imp->imp_conn_cnt);
728         ptlrpcd_add_req(request);
729         rc = 0;
730 out:
731         if (rc != 0) {
732                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
733         }
734
735         RETURN(rc);
736 }
737 EXPORT_SYMBOL(ptlrpc_connect_import);
738
739 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
740 {
741 #ifdef __KERNEL__
742         struct obd_import_conn *imp_conn;
743 #endif
744         int wake_pinger = 0;
745
746         ENTRY;
747
748         spin_lock(&imp->imp_lock);
749         if (list_empty(&imp->imp_conn_list))
750                 GOTO(unlock, 0);
751
752 #ifdef __KERNEL__
753         imp_conn = list_entry(imp->imp_conn_list.prev,
754                               struct obd_import_conn,
755                               oic_item);
756
757         /* XXX: When the failover node is the primary node, it is possible
758          * to have two identical connections in imp_conn_list. We must
759          * compare not conn's pointers but NIDs, otherwise we can defeat
760          * connection throttling. (See bug 14774.) */
761         if (imp->imp_conn_current->oic_conn->c_peer.nid !=
762                                 imp_conn->oic_conn->c_peer.nid) {
763                 ptlrpc_ping_import_soon(imp);
764                 wake_pinger = 1;
765         }
766
767 #else
768         /* liblustre has no pinger thead, so we wakup pinger anyway */
769         wake_pinger = 1;
770 #endif
771  unlock:
772         spin_unlock(&imp->imp_lock);
773
774         if (wake_pinger)
775                 ptlrpc_pinger_wake_up();
776
777         EXIT;
778 }
779
780 static int ptlrpc_busy_reconnect(int rc)
781 {
782         return (rc == -EBUSY) || (rc == -EAGAIN);
783 }
784
785 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
786                                     void * data, int rc)
787 {
788         struct ptlrpc_connect_async_args *aa = data;
789         struct obd_import *imp = request->rq_import;
790         struct client_obd *cli = &imp->imp_obd->u.cli;
791         struct lustre_handle old_hdl;
792         __u64 old_connect_flags;
793         int msg_flags;
794         ENTRY;
795
796         spin_lock(&imp->imp_lock);
797         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
798                 spin_unlock(&imp->imp_lock);
799                 RETURN(0);
800         }
801
802         if (rc) {
803                 /* if this reconnect to busy export - not need select new target
804                  * for connecting*/
805                 imp->imp_force_reconnect = ptlrpc_busy_reconnect(rc);
806                 spin_unlock(&imp->imp_lock);
807                 GOTO(out, rc);
808         }
809
810         LASSERT(imp->imp_conn_current);
811
812         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
813
814         /* All imports are pingable */
815         imp->imp_pingable = 1;
816         imp->imp_force_reconnect = 0;
817
818         if (aa->pcaa_initial_connect) {
819                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
820                         imp->imp_replayable = 1;
821                         spin_unlock(&imp->imp_lock);
822                         CDEBUG(D_HA, "connected to replayable target: %s\n",
823                                obd2cli_tgt(imp->imp_obd));
824                 } else {
825                         imp->imp_replayable = 0;
826                         spin_unlock(&imp->imp_lock);
827                 }
828
829                 if ((request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V1 &&
830                      msg_flags & MSG_CONNECT_NEXT_VER) ||
831                     request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2) {
832                         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
833                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
834                                obd2cli_tgt(imp->imp_obd));
835                 } else {
836                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
837                                obd2cli_tgt(imp->imp_obd));
838                 }
839
840                 imp->imp_remote_handle =
841                                 *lustre_msg_get_handle(request->rq_repmsg);
842
843                 /* Initial connects are allowed for clients with non-random
844                  * uuids when servers are in recovery.  Simply signal the
845                  * servers replay is complete and wait in REPLAY_WAIT. */
846                 if (msg_flags & MSG_CONNECT_RECOVERING) {
847                         CDEBUG(D_HA, "connect to %s during recovery\n",
848                                obd2cli_tgt(imp->imp_obd));
849                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
850                 } else {
851                         IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
852                         ptlrpc_activate_import(imp);
853                 }
854                 GOTO(finish, rc = 0);
855         } else {
856                 spin_unlock(&imp->imp_lock);
857         }
858
859         /* Determine what recovery state to move the import to. */
860         if (MSG_CONNECT_RECONNECT & msg_flags) {
861                 memset(&old_hdl, 0, sizeof(old_hdl));
862                 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
863                             sizeof (old_hdl))) {
864                         CERROR("%s@%s didn't like our handle "LPX64
865                                ", failed\n", obd2cli_tgt(imp->imp_obd),
866                                imp->imp_connection->c_remote_uuid.uuid,
867                                imp->imp_dlm_handle.cookie);
868                         GOTO(out, rc = -ENOTCONN);
869                 }
870
871                 if (memcmp(&imp->imp_remote_handle,
872                            lustre_msg_get_handle(request->rq_repmsg),
873                            sizeof(imp->imp_remote_handle))) {
874                         int level = msg_flags & MSG_CONNECT_RECOVERING ?
875                                 D_HA : D_WARNING;
876
877                         /* Bug 16611/14775: if server handle have changed,
878                          * that means some sort of disconnection happened.
879                          * If the server is not in recovery, that also means it
880                          * already erased all of our state because of previous
881                          * eviction. If it is in recovery - we are safe to
882                          * participate since we can reestablish all of our state
883                          * with server again */
884                         CDEBUG(level,"%s@%s changed server handle from "
885                                      LPX64" to "LPX64"%s\n",
886                                      obd2cli_tgt(imp->imp_obd),
887                                      imp->imp_connection->c_remote_uuid.uuid,
888                                      imp->imp_remote_handle.cookie,
889                                      lustre_msg_get_handle(request->rq_repmsg)->
890                                                                         cookie,
891                                      (MSG_CONNECT_RECOVERING & msg_flags) ?
892                                          " but is still in recovery" : "");
893
894                         imp->imp_remote_handle =
895                                      *lustre_msg_get_handle(request->rq_repmsg);
896
897                         if (!(MSG_CONNECT_RECOVERING & msg_flags)) {
898                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
899                                 GOTO(finish, rc = 0);
900                         }
901
902                 } else {
903                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
904                                obd2cli_tgt(imp->imp_obd),
905                                imp->imp_connection->c_remote_uuid.uuid);
906                 }
907
908                 if (imp->imp_invalid) {
909                         CDEBUG(D_HA, "%s: reconnected but import is invalid; "
910                                "marking evicted\n", imp->imp_obd->obd_name);
911                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
912                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
913                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
914                                imp->imp_obd->obd_name,
915                                obd2cli_tgt(imp->imp_obd));
916
917                         spin_lock(&imp->imp_lock);
918                         imp->imp_resend_replay = 1;
919                         /* VBR: delayed connection */
920                         if (MSG_CONNECT_DELAYED & msg_flags) {
921                                 imp->imp_delayed_recovery = 1;
922                                 imp->imp_no_lock_replay = 1;
923                         }
924                         spin_unlock(&imp->imp_lock);
925
926                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
927                 } else {
928                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
929                 }
930         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
931                 LASSERT(imp->imp_replayable);
932                 imp->imp_remote_handle =
933                                 *lustre_msg_get_handle(request->rq_repmsg);
934                 imp->imp_last_replay_transno = 0;
935                 /* VBR: delayed connection */
936                 if (MSG_CONNECT_DELAYED & msg_flags) {
937                         spin_lock(&imp->imp_lock);
938                         imp->imp_delayed_recovery = 1;
939                         imp->imp_no_lock_replay = 1;
940                         spin_unlock(&imp->imp_lock);
941                 }
942                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
943         } else {
944                 DEBUG_REQ(D_HA, request, "evicting (not initial connect and "
945                           "flags reconnect/recovering not set: %x)",msg_flags);
946                 imp->imp_remote_handle =
947                                 *lustre_msg_get_handle(request->rq_repmsg);
948                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
949         }
950
951         /* Sanity checks for a reconnected import. */
952         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
953                 CERROR("imp_replayable flag does not match server "
954                        "after reconnect. We should LBUG right here.\n");
955         }
956
957         if (lustre_msg_get_last_committed(request->rq_repmsg) > 0 &&
958             lustre_msg_get_last_committed(request->rq_repmsg) <
959             aa->pcaa_peer_committed) {
960                 CERROR("%s went back in time (transno "LPD64
961                        " was previously committed, server now claims "LPD64
962                        ")!  See https://bugzilla.lustre.org/show_bug.cgi?"
963                        "id=9646\n",
964                        obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
965                        lustre_msg_get_last_committed(request->rq_repmsg));
966         }
967
968 finish:
969         rc = ptlrpc_import_recovery_state_machine(imp);
970         if (rc != 0) {
971                 if (rc == -ENOTCONN) {
972                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
973                                "invalidating and reconnecting\n",
974                                obd2cli_tgt(imp->imp_obd),
975                                imp->imp_connection->c_remote_uuid.uuid);
976                         ptlrpc_connect_import(imp, NULL);
977                         RETURN(0);
978                 }
979         } else {
980                 struct obd_connect_data *ocd;
981                 struct obd_export *exp;
982
983                 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
984                                          lustre_swab_connect);
985                 spin_lock(&imp->imp_lock);
986                 list_del(&imp->imp_conn_current->oic_item);
987                 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
988                 imp->imp_last_success_conn =
989                         imp->imp_conn_current->oic_last_attempt;
990
991                 if (ocd == NULL) {
992                         spin_unlock(&imp->imp_lock);
993                         CERROR("Wrong connect data from server\n");
994                         rc = -EPROTO;
995                         GOTO(out, rc);
996                 }
997
998                 imp->imp_connect_data = *ocd;
999
1000                 exp = class_conn2export(&imp->imp_dlm_handle);
1001                 spin_unlock(&imp->imp_lock);
1002
1003                 /* check that server granted subset of flags we asked for. */
1004                 LASSERTF((ocd->ocd_connect_flags &
1005                           imp->imp_connect_flags_orig) ==
1006                          ocd->ocd_connect_flags, LPX64" != "LPX64,
1007                          imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
1008
1009                 if (!exp) {
1010                         /* This could happen if export is cleaned during the
1011                            connect attempt */
1012                         CERROR("Missing export for %s\n",
1013                                imp->imp_obd->obd_name);
1014                         GOTO(out, rc = -ENODEV);
1015                 }
1016                 old_connect_flags = exp->exp_connect_flags;
1017                 exp->exp_connect_flags = ocd->ocd_connect_flags;
1018                 imp->imp_obd->obd_self_export->exp_connect_flags =
1019                         ocd->ocd_connect_flags;
1020                 class_export_put(exp);
1021
1022                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
1023
1024                 if (!ocd->ocd_ibits_known &&
1025                     ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
1026                         CERROR("Inodebits aware server returned zero compatible"
1027                                " bits?\n");
1028
1029                 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1030                     (ocd->ocd_version > LUSTRE_VERSION_CODE +
1031                                         LUSTRE_VERSION_OFFSET_WARN ||
1032                      ocd->ocd_version < LUSTRE_VERSION_CODE -
1033                                         LUSTRE_VERSION_OFFSET_WARN)) {
1034                         /* Sigh, some compilers do not like #ifdef in the middle
1035                            of macro arguments */
1036 #ifdef __KERNEL__
1037                         const char *older =
1038                                 "older.  Consider upgrading this client";
1039 #else
1040                         const char *older =
1041                                 "older.  Consider recompiling this application";
1042 #endif
1043                         const char *newer = "newer than client version";
1044
1045                         LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
1046                                       "is much %s (%s)\n",
1047                                       obd2cli_tgt(imp->imp_obd),
1048                                       OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1049                                       OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1050                                       OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1051                                       OBD_OCD_VERSION_FIX(ocd->ocd_version),
1052                                       ocd->ocd_version > LUSTRE_VERSION_CODE ?
1053                                       newer : older, LUSTRE_VERSION_STRING);
1054                 }
1055
1056                 if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
1057                         /* We sent to the server ocd_cksum_types with bits set
1058                          * for algorithms we understand. The server masked off
1059                          * the checksum types it doesn't support */
1060                         if ((ocd->ocd_cksum_types & OBD_CKSUM_ALL) == 0) {
1061                                 LCONSOLE_WARN("The negotiation of the checksum "
1062                                               "alogrithm to use with server %s "
1063                                               "failed (%x/%x), disabling "
1064                                               "checksums\n",
1065                                               obd2cli_tgt(imp->imp_obd),
1066                                               ocd->ocd_cksum_types,
1067                                               OBD_CKSUM_ALL);
1068                                 cli->cl_checksum = 0;
1069                                 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1070                                 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1071                         } else {
1072                                 cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
1073
1074                                 if (ocd->ocd_cksum_types & OSC_DEFAULT_CKSUM)
1075                                         cli->cl_cksum_type = OSC_DEFAULT_CKSUM;
1076                                 else if (ocd->ocd_cksum_types & OBD_CKSUM_ADLER)
1077                                         cli->cl_cksum_type = OBD_CKSUM_ADLER;
1078                                 else
1079                                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
1080                         }
1081                 } else {
1082                         /* The server does not support OBD_CONNECT_CKSUM.
1083                          * Enforce CRC32 for backward compatibility*/
1084                         cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1085                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
1086                 }
1087
1088                 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
1089                         cli->cl_max_pages_per_rpc =
1090                                 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
1091                 }
1092
1093                 /* Reset ns_connect_flags only for initial connect. It might be
1094                  * changed in while using FS and if we reset it in reconnect
1095                  * this leads to lossing user settings done before such as
1096                  * disable lru_resize, etc. */
1097                 if (old_connect_flags != exp->exp_connect_flags ||
1098                     aa->pcaa_initial_connect) {
1099                         CDEBUG(D_HA, "%s: Resetting ns_connect_flags to server "
1100                                "flags: "LPX64"\n", imp->imp_obd->obd_name,
1101                                ocd->ocd_connect_flags);
1102                         imp->imp_obd->obd_namespace->ns_connect_flags =
1103                                 ocd->ocd_connect_flags;
1104                         imp->imp_obd->obd_namespace->ns_orig_connect_flags =
1105                                 ocd->ocd_connect_flags;
1106                 }
1107
1108                 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
1109                     (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
1110                         /* We need a per-message support flag, because
1111                            a. we don't know if the incoming connect reply
1112                               supports AT or not (in reply_in_callback)
1113                               until we unpack it.
1114                            b. failovered server means export and flags are gone
1115                               (in ptlrpc_send_reply).
1116                            Can only be set when we know AT is supported at
1117                            both ends */
1118                         imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
1119                 else
1120                         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
1121
1122                 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
1123                         (cli->cl_max_pages_per_rpc > 0));
1124         }
1125
1126  out:
1127         if (rc != 0) {
1128                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
1129                 spin_lock(&imp->imp_lock);
1130                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov &&
1131                     (request->rq_import_generation == imp->imp_generation))
1132                         ptlrpc_deactivate_and_unlock_import(imp);
1133                 else
1134                         spin_unlock(&imp->imp_lock);
1135
1136                 if (imp->imp_recon_bk && imp->imp_last_recon) {
1137                         /* Give up trying to reconnect */
1138                         imp->imp_obd->obd_no_recov = 1;
1139                         ptlrpc_deactivate_import(imp);
1140                 }
1141
1142                 if (rc == -EPROTO) {
1143                         struct obd_connect_data *ocd;
1144                         ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
1145                                                  sizeof *ocd,
1146                                                  lustre_swab_connect);
1147                         if (ocd &&
1148                             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1149                             (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
1150                            /* Actually servers are only supposed to refuse
1151                               connection from liblustre clients, so we should
1152                               never see this from VFS context */
1153                                 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
1154                                         "(%d.%d.%d.%d)"
1155                                         " refused connection from this client "
1156                                         "with an incompatible version (%s).  "
1157                                         "Client must be recompiled\n",
1158                                         obd2cli_tgt(imp->imp_obd),
1159                                         OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1160                                         OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1161                                         OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1162                                         OBD_OCD_VERSION_FIX(ocd->ocd_version),
1163                                         LUSTRE_VERSION_STRING);
1164                                 ptlrpc_deactivate_import(imp);
1165                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
1166                         }
1167                         RETURN(-EPROTO);
1168                 }
1169
1170                 ptlrpc_maybe_ping_import_soon(imp);
1171
1172                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
1173                        obd2cli_tgt(imp->imp_obd),
1174                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
1175         }
1176
1177         spin_lock(&imp->imp_lock);
1178         imp->imp_last_recon = 0;
1179         spin_unlock(&imp->imp_lock);
1180
1181         cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1182         RETURN(rc);
1183 }
1184
1185 static int completed_replay_interpret(struct ptlrpc_request *req,
1186                                       void * data, int rc)
1187 {
1188         ENTRY;
1189         atomic_dec(&req->rq_import->imp_replay_inflight);
1190         if (req->rq_status == 0 &&
1191             !req->rq_import->imp_vbr_failed) {
1192                 ptlrpc_import_recovery_state_machine(req->rq_import);
1193         } else {
1194                 if (req->rq_import->imp_vbr_failed) {
1195                         CDEBUG(D_WARNING,
1196                                "%s: version recovery fails, reconnecting\n",
1197                                req->rq_import->imp_obd->obd_name);
1198                         spin_lock(&req->rq_import->imp_lock);
1199                         req->rq_import->imp_vbr_failed = 0;
1200                         spin_unlock(&req->rq_import->imp_lock);
1201                 } else {
1202                         CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
1203                                      "reconnecting\n",
1204                                req->rq_import->imp_obd->obd_name,
1205                                req->rq_status);
1206                 }
1207                 ptlrpc_connect_import(req->rq_import, NULL);
1208         }
1209         RETURN(0);
1210 }
1211
1212 static int signal_completed_replay(struct obd_import *imp)
1213 {
1214         struct ptlrpc_request *req;
1215         ENTRY;
1216
1217         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
1218         atomic_inc(&imp->imp_replay_inflight);
1219
1220         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
1221         if (!req) {
1222                 atomic_dec(&imp->imp_replay_inflight);
1223                 RETURN(-ENOMEM);
1224         }
1225
1226         ptlrpc_req_set_repsize(req, 1, NULL);
1227         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
1228         lustre_msg_add_flags(req->rq_reqmsg,
1229                              MSG_LOCK_REPLAY_DONE |
1230                              MSG_REQ_REPLAY_DONE |
1231                              MSG_LAST_REPLAY);
1232
1233         if (imp->imp_delayed_recovery)
1234                 lustre_msg_add_flags(req->rq_reqmsg, MSG_DELAY_REPLAY);
1235         req->rq_interpret_reply = completed_replay_interpret;
1236
1237         if (AT_OFF)
1238                 req->rq_timeout *= 3;
1239
1240         ptlrpcd_add_req(req);
1241         RETURN(0);
1242 }
1243
1244 #ifdef __KERNEL__
1245 static int ptlrpc_invalidate_import_thread(void *data)
1246 {
1247         struct obd_import *imp = data;
1248
1249         ENTRY;
1250
1251         cfs_daemonize_ctxt("ll_imp_inval");
1252
1253         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
1254                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
1255                imp->imp_connection->c_remote_uuid.uuid);
1256
1257         ptlrpc_invalidate_import(imp);
1258
1259         if (obd_dump_on_eviction) {
1260                 CERROR("dump the log upon eviction\n");
1261                 libcfs_debug_dumplog();
1262         }
1263
1264         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1265         ptlrpc_import_recovery_state_machine(imp);
1266
1267         class_import_put(imp);
1268         RETURN(0);
1269 }
1270 #endif
1271
1272 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
1273 {
1274         int rc = 0;
1275         int inflight;
1276         char *target_start;
1277         int target_len;
1278
1279         ENTRY;
1280         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
1281                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1282                           &target_start, &target_len);
1283                 /* Don't care about MGC eviction */
1284                 if (strcmp(imp->imp_obd->obd_type->typ_name,
1285                            LUSTRE_MGC_NAME) != 0) {
1286                         LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
1287                                            "%.*s; in progress operations using "
1288                                            "this service will fail.\n",
1289                                            target_len, target_start);
1290                 }
1291                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
1292                        obd2cli_tgt(imp->imp_obd),
1293                        imp->imp_connection->c_remote_uuid.uuid);
1294
1295 #ifdef __KERNEL__
1296                 /* bug 17802:  XXX client_disconnect_export vs connect request
1297                  * race. if client will evicted at this time, we start
1298                  * invalidate thread without referece to import and import can
1299                  * be freed at same time. */
1300                 class_import_get(imp);
1301                 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
1302                                    CLONE_VM | CLONE_FILES);
1303                 if (rc < 0) {
1304                         class_import_put(imp);
1305                         CERROR("error starting invalidate thread: %d\n", rc);
1306                 } else {
1307                         rc = 0;
1308                 }
1309                 RETURN(rc);
1310 #else
1311                 ptlrpc_invalidate_import(imp);
1312
1313                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1314 #endif
1315         }
1316
1317         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1318                 CDEBUG(D_HA, "replay requested by %s\n",
1319                        obd2cli_tgt(imp->imp_obd));
1320                 rc = ptlrpc_replay_next(imp, &inflight);
1321                 if (inflight == 0 &&
1322                     atomic_read(&imp->imp_replay_inflight) == 0) {
1323                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
1324                         rc = ldlm_replay_locks(imp);
1325                         if (rc)
1326                                 GOTO(out, rc);
1327                 }
1328                 rc = 0;
1329         }
1330
1331         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1332                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1333                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
1334                         rc = signal_completed_replay(imp);
1335                         if (rc)
1336                                 GOTO(out, rc);
1337                 }
1338
1339         }
1340
1341         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1342                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1343                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1344                 }
1345         }
1346
1347         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1348                 CDEBUG(D_HA, "reconnected to %s@%s\n",
1349                        obd2cli_tgt(imp->imp_obd),
1350                        imp->imp_connection->c_remote_uuid.uuid);
1351
1352                 rc = ptlrpc_resend(imp);
1353                 if (rc)
1354                         GOTO(out, rc);
1355                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1356                 ptlrpc_activate_import(imp);
1357
1358                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1359                           &target_start, &target_len);
1360                 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1361                               "using nid %s.\n", imp->imp_obd->obd_name,
1362                               target_len, target_start,
1363                               libcfs_nid2str(imp->imp_connection->c_peer.nid));
1364         }
1365
1366         if (imp->imp_state == LUSTRE_IMP_FULL) {
1367                 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1368                 ptlrpc_wake_delayed(imp);
1369         }
1370
1371  out:
1372         RETURN(rc);
1373 }
1374
1375 static int back_to_sleep(void *unused)
1376 {
1377         return 0;
1378 }
1379
1380 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1381 {
1382         struct ptlrpc_request *req;
1383         int rq_opc, rc = 0;
1384         int nowait = imp->imp_obd->obd_force;
1385         ENTRY;
1386
1387         if (nowait)
1388                 GOTO(set_state, rc);
1389
1390         switch (imp->imp_connect_op) {
1391         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1392         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1393         case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1394         default:
1395                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1396                        obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1397                 RETURN(-EINVAL);
1398         }
1399
1400         if (ptlrpc_import_in_recovery(imp)) {
1401                 struct l_wait_info lwi;
1402                 cfs_duration_t timeout;
1403
1404                 if (AT_OFF) {
1405                         timeout = cfs_time_seconds(obd_timeout);
1406                 } else {
1407                         int idx = import_at_get_index(imp,
1408                                 imp->imp_client->cli_request_portal);
1409                         timeout = cfs_time_seconds(
1410                                 at_get(&imp->imp_at.iat_service_estimate[idx]));
1411                 }
1412                 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout),
1413                                        back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1414                 rc = l_wait_event(imp->imp_recovery_waitq,
1415                                   !ptlrpc_import_in_recovery(imp), &lwi);
1416         }
1417
1418         spin_lock(&imp->imp_lock);
1419         if (imp->imp_state != LUSTRE_IMP_FULL)
1420                 GOTO(out, 0);
1421
1422         spin_unlock(&imp->imp_lock);
1423
1424         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1425         if (req) {
1426                 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1427                  * it fails.  We can get through the above with a down server
1428                  * if the client doesn't know the server is gone yet. */
1429                 req->rq_no_resend = 1;
1430
1431 #ifndef CRAY_XT3
1432                 /* We want client umounts to happen quickly, no matter the
1433                    server state... */
1434                 req->rq_timeout = min_t(int, req->rq_timeout,
1435                                         INITIAL_CONNECT_TIMEOUT);
1436 #else
1437                 /* ... but we always want liblustre clients to nicely
1438                    disconnect, so only use the adaptive value. */
1439                 if (AT_OFF)
1440                         req->rq_timeout = obd_timeout / 3;
1441 #endif
1442
1443                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1444                 req->rq_send_state =  LUSTRE_IMP_CONNECTING;
1445                 ptlrpc_req_set_repsize(req, 1, NULL);
1446                 rc = ptlrpc_queue_wait(req);
1447                 ptlrpc_req_finished(req);
1448         }
1449
1450 set_state:
1451         spin_lock(&imp->imp_lock);
1452 out:
1453         if (noclose)
1454                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1455         else
1456                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1457         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1458         /* Try all connections in the future - bz 12758 */
1459         imp->imp_last_recon = 0;
1460         spin_unlock(&imp->imp_lock);
1461
1462         RETURN(rc);
1463 }
1464
1465 /* Sets maximal number of RPCs possible originating from other side of this
1466    import (server) to us and number of async RPC replies that we are not waiting
1467    for arriving */
1468 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1469 {
1470         LNetSetAsync(imp->imp_connection->c_peer, count);
1471 }
1472
1473 void ptlrpc_cleanup_imp(struct obd_import *imp)
1474 {
1475         ENTRY;
1476
1477         spin_lock(&imp->imp_lock);
1478         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1479         imp->imp_generation++;
1480         spin_unlock(&imp->imp_lock);
1481         ptlrpc_abort_inflight(imp);
1482
1483         EXIT;
1484 }
1485
1486 /* Adaptive Timeout utils */
1487 extern unsigned int at_min, at_max, at_history;
1488
1489 /* Bin into timeslices using AT_BINS bins.
1490    This gives us a max of the last binlimit*AT_BINS secs without the storage,
1491    but still smoothing out a return to normalcy from a slow response.
1492    (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1493 int at_measured(struct adaptive_timeout *at, unsigned int val)
1494 {
1495         unsigned int old = at->at_current;
1496         time_t now = cfs_time_current_sec();
1497         time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
1498
1499         LASSERT(at);
1500         CDEBUG(D_OTHER, "add %u to %p time=%lu v=%u (%u %u %u %u)\n",
1501                val, at, now - at->at_binstart, at->at_current,
1502                at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1503
1504         if (val == 0)
1505                 /* 0's don't count, because we never want our timeout to
1506                    drop to 0, and because 0 could mean an error */
1507                 return 0;
1508
1509         spin_lock(&at->at_lock);
1510
1511         if (unlikely(at->at_binstart == 0)) {
1512                 /* Special case to remove default from history */
1513                 at->at_current = val;
1514                 at->at_worst_ever = val;
1515                 at->at_worst_time = now;
1516                 at->at_hist[0] = val;
1517                 at->at_binstart = now;
1518         } else if (now - at->at_binstart < binlimit ) {
1519                 /* in bin 0 */
1520                 at->at_hist[0] = max(val, at->at_hist[0]);
1521                 at->at_current = max(val, at->at_current);
1522         } else {
1523                 int i, shift;
1524                 unsigned int maxv = val;
1525                 /* move bins over */
1526                 shift = (now - at->at_binstart) / binlimit;
1527                 LASSERT(shift > 0);
1528                 for(i = AT_BINS - 1; i >= 0; i--) {
1529                         if (i >= shift) {
1530                                 at->at_hist[i] = at->at_hist[i - shift];
1531                                 maxv = max(maxv, at->at_hist[i]);
1532                         } else {
1533                                 at->at_hist[i] = 0;
1534                         }
1535                 }
1536                 at->at_hist[0] = val;
1537                 at->at_current = maxv;
1538                 at->at_binstart += shift * binlimit;
1539         }
1540
1541         if (at->at_current > at->at_worst_ever) {
1542                 at->at_worst_ever = at->at_current;
1543                 at->at_worst_time = now;
1544         }
1545
1546         if (at->at_flags & AT_FLG_NOHIST)
1547                 /* Only keep last reported val; keeping the rest of the history
1548                    for proc only */
1549                 at->at_current = val;
1550
1551         if (at_max > 0)
1552                 at->at_current =  min(at->at_current, at_max);
1553         at->at_current =  max(at->at_current, at_min);
1554
1555         if (at->at_current != old)
1556                 CDEBUG(D_OTHER, "AT %p change: old=%u new=%u delta=%d "
1557                        "(val=%u) hist %u %u %u %u\n", at,
1558                        old, at->at_current, at->at_current - old, val,
1559                        at->at_hist[0], at->at_hist[1], at->at_hist[2],
1560                        at->at_hist[3]);
1561
1562         /* if we changed, report the old value */
1563         old = (at->at_current != old) ? old : 0;
1564
1565         spin_unlock(&at->at_lock);
1566         return old;
1567 }
1568
1569 /* Find the imp_at index for a given portal; assign if space available */
1570 int import_at_get_index(struct obd_import *imp, int portal)
1571 {
1572         struct imp_at *at = &imp->imp_at;
1573         int i;
1574
1575         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1576                 if (at->iat_portal[i] == portal)
1577                         return i;
1578                 if (at->iat_portal[i] == 0)
1579                         /* unused */
1580                         break;
1581         }
1582
1583         /* Not found in list, add it under a lock */
1584         spin_lock(&imp->imp_lock);
1585
1586         /* Check unused under lock */
1587         for (; i < IMP_AT_MAX_PORTALS; i++) {
1588                 if (at->iat_portal[i] == portal)
1589                         goto out;
1590                 if (at->iat_portal[i] == 0)
1591                         /* unused */
1592                         break;
1593         }
1594
1595         /* Not enough portals? */
1596         LASSERT(i < IMP_AT_MAX_PORTALS);
1597
1598         at->iat_portal[i] = portal;
1599 out:
1600         spin_unlock(&imp->imp_lock);
1601         return i;
1602 }