Whamcloud - gitweb
c4d4222a1fd7063e7a24d25e33c9797ec535f657
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/import.c
37  *
38  * Author: Mike Shaver <shaver@clusterfs.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #ifndef __KERNEL__
43 # include <liblustre.h>
44 #endif
45
46 #include <obd_support.h>
47 #include <lustre_ha.h>
48 #include <lustre_net.h>
49 #include <lustre_import.h>
50 #include <lustre_export.h>
51 #include <obd.h>
52 #include <obd_class.h>
53
54 #include "ptlrpc_internal.h"
55
56 struct ptlrpc_connect_async_args {
57          __u64 pcaa_peer_committed;
58         int pcaa_initial_connect;
59 };
60
61 static void __import_set_state(struct obd_import *imp,
62                                enum lustre_imp_state state)
63 {
64         imp->imp_state = state;
65         imp->imp_state_hist[imp->imp_state_hist_idx].ish_state = state;
66         imp->imp_state_hist[imp->imp_state_hist_idx].ish_time =
67                 cfs_time_current_sec();
68         imp->imp_state_hist_idx = (imp->imp_state_hist_idx + 1) %
69                 IMP_STATE_HIST_LEN;
70 }
71
72 /* A CLOSED import should remain so. */
73 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
74 do {                                                                           \
75         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
76                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
77                       imp, obd2cli_tgt(imp->imp_obd),                          \
78                       ptlrpc_import_state_name(imp->imp_state),                \
79                       ptlrpc_import_state_name(state));                        \
80                __import_set_state(imp, state);                                 \
81         }                                                                      \
82 } while(0)
83
84 #define IMPORT_SET_STATE(imp, state)            \
85 do {                                            \
86         spin_lock(&imp->imp_lock);              \
87         IMPORT_SET_STATE_NOLOCK(imp, state);    \
88         spin_unlock(&imp->imp_lock);            \
89 } while(0)
90
91
92 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
93                                     void * data, int rc);
94 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
95
96 /* Only this function is allowed to change the import state when it is
97  * CLOSED. I would rather refcount the import and free it after
98  * disconnection like we do with exports. To do that, the client_obd
99  * will need to save the peer info somewhere other than in the import,
100  * though. */
101 int ptlrpc_init_import(struct obd_import *imp)
102 {
103         spin_lock(&imp->imp_lock);
104
105         imp->imp_generation++;
106         imp->imp_state =  LUSTRE_IMP_NEW;
107
108         spin_unlock(&imp->imp_lock);
109
110         return 0;
111 }
112 EXPORT_SYMBOL(ptlrpc_init_import);
113
114 #define UUID_STR "_UUID"
115 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
116                       int *uuid_len)
117 {
118         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
119                 ? uuid : uuid + strlen(prefix);
120
121         *uuid_len = strlen(*uuid_start);
122
123         if (*uuid_len < strlen(UUID_STR))
124                 return;
125
126         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
127                     UUID_STR, strlen(UUID_STR)))
128                 *uuid_len -= strlen(UUID_STR);
129 }
130
131 /* Returns true if import was FULL, false if import was already not
132  * connected.
133  * @imp - import to be disconnected
134  * @conn_cnt - connection count (epoch) of the request that timed out
135  *             and caused the disconnection.  In some cases, multiple
136  *             inflight requests can fail to a single target (e.g. OST
137  *             bulk requests) and if one has already caused a reconnection
138  *             (increasing the import->conn_cnt) the older failure should
139  *             not also cause a reconnection.  If zero it forces a reconnect.
140  */
141 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
142 {
143         int rc = 0;
144
145         spin_lock(&imp->imp_lock);
146
147         if (imp->imp_state == LUSTRE_IMP_FULL &&
148             (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
149                 char *target_start;
150                 int   target_len;
151
152                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
153                           &target_start, &target_len);
154                 if (imp->imp_replayable) {
155                         LCONSOLE_WARN("%s: Connection to service %.*s via nid "
156                                "%s was lost; in progress operations using this "
157                                "service will wait for recovery to complete.\n",
158                                imp->imp_obd->obd_name, target_len, target_start,
159                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
160                 } else {
161                         LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
162                                "%.*s via nid %s was lost; in progress "
163                                "operations using this service will fail.\n",
164                                imp->imp_obd->obd_name, target_len, target_start,
165                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
166                 }
167                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
168                 spin_unlock(&imp->imp_lock);
169
170                 if (obd_dump_on_timeout)
171                         libcfs_debug_dumplog();
172
173                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
174                 rc = 1;
175         } else {
176                 spin_unlock(&imp->imp_lock);
177                 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
178                        imp->imp_client->cli_name, imp,
179                        (imp->imp_state == LUSTRE_IMP_FULL &&
180                         imp->imp_conn_cnt > conn_cnt) ?
181                        "reconnected" : "not connected", imp->imp_conn_cnt,
182                        conn_cnt, ptlrpc_import_state_name(imp->imp_state));
183         }
184
185         return rc;
186 }
187
188 /* Must be called with imp_lock held! */
189 static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
190 {
191         ENTRY;
192         LASSERT_SPIN_LOCKED(&imp->imp_lock);
193
194         CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
195         imp->imp_invalid = 1;
196         imp->imp_generation++;
197         spin_unlock(&imp->imp_lock);
198
199         ptlrpc_abort_inflight(imp);
200         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
201 }
202
203 /*
204  * This acts as a barrier; all existing requests are rejected, and
205  * no new requests will be accepted until the import is valid again.
206  */
207 void ptlrpc_deactivate_import(struct obd_import *imp)
208 {
209         spin_lock(&imp->imp_lock);
210         ptlrpc_deactivate_and_unlock_import(imp);
211 }
212
213 static unsigned int
214 ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
215 {
216         long dl;
217
218         if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
219               (req->rq_phase == RQ_PHASE_BULK) ||
220               (req->rq_phase == RQ_PHASE_NEW)))
221                 return 0;
222
223         if (req->rq_timedout)
224                 return 0;
225
226         if (req->rq_phase == RQ_PHASE_NEW)
227                 dl = req->rq_sent;
228         else
229                 dl = req->rq_deadline;
230
231         if (dl <= now)
232                 return 0;
233
234         return dl - now;
235 }
236
237 static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
238 {
239         time_t now = cfs_time_current_sec();
240         struct list_head *tmp, *n;
241         struct ptlrpc_request *req;
242         unsigned int timeout = 0;
243
244         spin_lock(&imp->imp_lock);
245         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
246                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
247                 timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
248         }
249         spin_unlock(&imp->imp_lock);
250         return timeout;
251 }
252
253 /*
254  * This function will invalidate the import, if necessary, then block
255  * for all the RPC completions, and finally notify the obd to
256  * invalidate its state (ie cancel locks, clear pending requests,
257  * etc).
258  */
259 void ptlrpc_invalidate_import(struct obd_import *imp)
260 {
261         struct list_head *tmp, *n;
262         struct ptlrpc_request *req;
263         struct l_wait_info lwi;
264         unsigned int timeout;
265         int rc;
266
267         atomic_inc(&imp->imp_inval_count);
268
269         /*
270          * If this is an invalid MGC connection, then don't bother
271          * waiting for imp_inflight to drop to 0.
272          */
273         if (imp->imp_invalid && imp->imp_recon_bk &&!imp->imp_obd->obd_no_recov)
274                 goto out;
275
276         if (!imp->imp_invalid || imp->imp_obd->obd_no_recov)
277                 ptlrpc_deactivate_import(imp);
278
279         LASSERT(imp->imp_invalid);
280
281         /* Wait forever until inflight == 0. We really can't do it another
282          * way because in some cases we need to wait for very long reply
283          * unlink. We can't do anything before that because there is really
284          * no guarantee that some rdma transfer is not in progress right now. */
285         do {
286                 /* Calculate max timeout for waiting on rpcs to error
287                  * out. Use obd_timeout if calculated value is smaller
288                  * than it. */
289                 timeout = ptlrpc_inflight_timeout(imp);
290                 timeout += timeout / 3;
291
292                 if (timeout == 0)
293                         timeout = obd_timeout;
294
295                 CDEBUG(D_RPCTRACE,"Sleeping %d sec for inflight to error out\n",
296                        timeout);
297
298                 /* Wait for all requests to error out and call completion
299                  * callbacks. Cap it at obd_timeout -- these should all
300                  * have been locally cancelled by ptlrpc_abort_inflight. */
301                 lwi = LWI_TIMEOUT_INTERVAL(
302                         cfs_timeout_cap(cfs_time_seconds(timeout)),
303                         cfs_time_seconds(1), NULL, NULL);
304                 rc = l_wait_event(imp->imp_recovery_waitq,
305                                 (atomic_read(&imp->imp_inflight) == 0), &lwi);
306                 if (rc) {
307                         const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
308
309                         CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
310                                cli_tgt, rc, atomic_read(&imp->imp_inflight));
311
312                         spin_lock(&imp->imp_lock);
313                         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
314                                 req = list_entry(tmp, struct ptlrpc_request,
315                                         rq_list);
316                                 DEBUG_REQ(D_ERROR, req,"still on sending list");
317                         }
318                         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
319                                 req = list_entry(tmp, struct ptlrpc_request,
320                                         rq_list);
321                                 DEBUG_REQ(D_ERROR, req,"still on delayed list");
322                         }
323
324                         if (atomic_read(&imp->imp_unregistering) == 0) {
325                                 /* We know that only "unregistering" rpcs may
326                                  * still survive in sending or delaying lists
327                                  * (They are waiting for long reply unlink in
328                                  * sluggish nets). Let's check this. If there
329                                  * is no unregistering and inflight != 0 this
330                                  * is bug. */
331                                 LASSERT(atomic_read(&imp->imp_inflight) == 0);
332
333                                 /* Let's save one loop as soon as inflight have
334                                  * dropped to zero. No new inflights possible at
335                                  * this point. */
336                                 rc = 0;
337                         } else {
338                                 CERROR("%s: RPCs in \"%s\" phase found (%d). "
339                                        "Network is sluggish? Waiting them "
340                                        "to error out.\n", cli_tgt,
341                                        ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
342                                        atomic_read(&imp->imp_unregistering));
343                         }
344                         spin_unlock(&imp->imp_lock);
345                 }
346         } while (rc != 0);
347
348         /* Let's additionally check that no new rpcs added to import in
349          * "invalidate" state. */
350         LASSERT(atomic_read(&imp->imp_inflight) == 0);
351
352 out:
353         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
354
355         atomic_dec(&imp->imp_inval_count);
356         cfs_waitq_broadcast(&imp->imp_recovery_waitq);
357 }
358
359 /* unset imp_invalid */
360 void ptlrpc_activate_import(struct obd_import *imp)
361 {
362         struct obd_device *obd = imp->imp_obd;
363
364         spin_lock(&imp->imp_lock);
365         imp->imp_invalid = 0;
366         spin_unlock(&imp->imp_lock);
367
368         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
369 }
370
371 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
372 {
373         ENTRY;
374
375         LASSERT(!imp->imp_dlm_fake);
376
377         if (ptlrpc_set_import_discon(imp, conn_cnt)) {
378                 if (!imp->imp_replayable) {
379                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
380                                "auto-deactivating\n",
381                                obd2cli_tgt(imp->imp_obd),
382                                imp->imp_connection->c_remote_uuid.uuid,
383                                imp->imp_obd->obd_name);
384                         ptlrpc_deactivate_import(imp);
385                 }
386
387                 CDEBUG(D_HA, "%s: waking up pinger\n",
388                        obd2cli_tgt(imp->imp_obd));
389
390                 spin_lock(&imp->imp_lock);
391                 imp->imp_force_verify = 1;
392                 spin_unlock(&imp->imp_lock);
393
394                 ptlrpc_pinger_wake_up();
395         }
396         EXIT;
397 }
398
399 int ptlrpc_reconnect_import(struct obd_import *imp)
400 {
401
402         ptlrpc_set_import_discon(imp, 0);
403         /* Force a new connect attempt */
404         ptlrpc_invalidate_import(imp);
405         /* Do a fresh connect next time by zeroing the handle */
406         ptlrpc_disconnect_import(imp, 1);
407         /* Wait for all invalidate calls to finish */
408         if (atomic_read(&imp->imp_inval_count) > 0) {
409                 int rc;
410                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
411                 rc = l_wait_event(imp->imp_recovery_waitq,
412                                   (atomic_read(&imp->imp_inval_count) == 0),
413                                   &lwi);
414                 if (rc)
415                         CERROR("Interrupted, inval=%d\n",
416                                atomic_read(&imp->imp_inval_count));
417         }
418
419         /*
420          * Allow reconnect attempts. Note: Currently, the function is
421          * only called by MGC. So assume this is a recoverable import,
422          * and force import to be recoverable. fix this if you need to
423          */
424
425         imp->imp_obd->obd_no_recov = 0;
426         /* Remove 'invalid' flag */
427         ptlrpc_activate_import(imp);
428         /* Attempt a new connect */
429         ptlrpc_recover_import(imp, NULL);
430         return 0;
431 }
432
433 EXPORT_SYMBOL(ptlrpc_reconnect_import);
434
435 static int import_select_connection(struct obd_import *imp)
436 {
437         struct obd_import_conn *imp_conn = NULL, *conn;
438         struct obd_export *dlmexp;
439         int tried_all = 1;
440         ENTRY;
441
442         spin_lock(&imp->imp_lock);
443
444         if (list_empty(&imp->imp_conn_list)) {
445                 CERROR("%s: no connections available\n",
446                         imp->imp_obd->obd_name);
447                 spin_unlock(&imp->imp_lock);
448                 RETURN(-EINVAL);
449         }
450
451         list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
452                 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
453                        imp->imp_obd->obd_name,
454                        libcfs_nid2str(conn->oic_conn->c_peer.nid),
455                        conn->oic_last_attempt);
456
457                 /* Don't thrash connections */
458                 if (cfs_time_before_64(cfs_time_current_64(),
459                                      conn->oic_last_attempt +
460                                      cfs_time_seconds(CONNECTION_SWITCH_MIN))) {
461                         continue;
462                 }
463
464                 /* If we have not tried this connection since the
465                    the last successful attempt, go with this one */
466                 if ((conn->oic_last_attempt == 0) ||
467                     cfs_time_beforeq_64(conn->oic_last_attempt,
468                                        imp->imp_last_success_conn)) {
469                         imp_conn = conn;
470                         tried_all = 0;
471                         break;
472                 }
473
474                 /* If all of the connections have already been tried
475                    since the last successful connection; just choose the
476                    least recently used */
477                 if (!imp_conn)
478                         imp_conn = conn;
479                 else if (cfs_time_before_64(conn->oic_last_attempt,
480                                             imp_conn->oic_last_attempt))
481                         imp_conn = conn;
482         }
483
484         /* if not found, simply choose the current one */
485         if (!imp_conn || imp->imp_force_reconnect) {
486                 LASSERT(imp->imp_conn_current);
487                 imp_conn = imp->imp_conn_current;
488                 tried_all = 0;
489         }
490         LASSERT(imp_conn->oic_conn);
491
492         /* If we've tried everything, and we're back to the beginning of the
493            list, increase our timeout and try again. It will be reset when
494            we do finally connect. (FIXME: really we should wait for all network
495            state associated with the last connection attempt to drain before
496            trying to reconnect on it.) */
497         if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item) &&
498             !imp->imp_recon_bk /* not retrying */) {
499                 if (at_get(&imp->imp_at.iat_net_latency) <
500                     CONNECTION_SWITCH_MAX) {
501                         at_add(&imp->imp_at.iat_net_latency,
502                                MIN(at_get(&imp->imp_at.iat_net_latency) +
503                                CONNECTION_SWITCH_INC, CONNECTION_SWITCH_MAX));
504                 }
505                 LASSERT(imp_conn->oic_last_attempt);
506                 CWARN("%s: tried all connections, increasing latency to %ds\n",
507                       imp->imp_obd->obd_name,
508                       at_get(&imp->imp_at.iat_net_latency));
509         }
510
511         imp_conn->oic_last_attempt = cfs_time_current_64();
512
513         /* switch connection, don't mind if it's same as the current one */
514         if (imp->imp_connection)
515                 ptlrpc_connection_put(imp->imp_connection);
516         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
517
518         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
519         LASSERT(dlmexp != NULL);
520         if (dlmexp->exp_connection)
521                 ptlrpc_connection_put(dlmexp->exp_connection);
522         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
523         class_export_put(dlmexp);
524
525         if (imp->imp_conn_current != imp_conn) {
526                 if (imp->imp_conn_current)
527                         CDEBUG(D_HA, "Changing connection for %s to %s/%s\n",
528                                imp->imp_obd->obd_name, imp_conn->oic_uuid.uuid,
529                                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
530                 imp->imp_conn_current = imp_conn;
531         }
532
533         CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
534                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
535                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
536
537         spin_unlock(&imp->imp_lock);
538
539         RETURN(0);
540 }
541
542 /**
543  * must be called under imp lock
544  */
545 static int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
546 {
547         struct ptlrpc_request *req;
548         struct list_head *tmp;
549
550         if (list_empty(&imp->imp_replay_list))
551                 return 0;
552         tmp = imp->imp_replay_list.next;
553         req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
554         *transno = req->rq_transno;
555         if (req->rq_transno == 0) {
556                 DEBUG_REQ(D_ERROR, req, "zero transno in replay");
557                 LBUG();
558         }
559
560         return 1;
561 }
562
563 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
564 {
565         struct obd_device *obd = imp->imp_obd;
566         int set_transno = 0;
567         int initial_connect = 0;
568         int rc;
569         __u64 committed_before_reconnect = 0;
570         struct ptlrpc_request *request;
571         __u32 size[] = { sizeof(struct ptlrpc_body),
572                        sizeof(imp->imp_obd->u.cli.cl_target_uuid),
573                        sizeof(obd->obd_uuid),
574                        sizeof(imp->imp_dlm_handle),
575                        sizeof(imp->imp_connect_data) };
576         char *tmp[] = { NULL,
577                         obd2cli_tgt(imp->imp_obd),
578                         obd->obd_uuid.uuid,
579                         (char *)&imp->imp_dlm_handle,
580                         (char *)&imp->imp_connect_data };
581         struct ptlrpc_connect_async_args *aa;
582
583         ENTRY;
584         spin_lock(&imp->imp_lock);
585         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
586                 spin_unlock(&imp->imp_lock);
587                 CERROR("can't connect to a closed import\n");
588                 RETURN(-EINVAL);
589         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
590                 spin_unlock(&imp->imp_lock);
591                 CERROR("already connected\n");
592                 RETURN(0);
593         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
594                 spin_unlock(&imp->imp_lock);
595                 CERROR("already connecting\n");
596                 RETURN(-EALREADY);
597         }
598
599         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
600
601         imp->imp_conn_cnt++;
602         imp->imp_resend_replay = 0;
603
604         if (!lustre_handle_is_used(&imp->imp_remote_handle))
605                 initial_connect = 1;
606         else
607                 committed_before_reconnect = imp->imp_peer_committed_transno;
608
609         set_transno = ptlrpc_first_transno(imp,
610                                            &imp->imp_connect_data.ocd_transno);
611
612         spin_unlock(&imp->imp_lock);
613
614         if (new_uuid) {
615                 struct obd_uuid uuid;
616
617                 obd_str2uuid(&uuid, new_uuid);
618                 rc = import_set_conn_priority(imp, &uuid);
619                 if (rc)
620                         GOTO(out, rc);
621         }
622
623         rc = import_select_connection(imp);
624         if (rc)
625                 GOTO(out, rc);
626
627         /* last in connection list */
628         if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
629                 if (imp->imp_initial_recov_bk && initial_connect) {
630                         CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
631                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
632                         /* Don't retry if connect fails */
633                         rc = 0;
634                         obd_set_info_async(obd->obd_self_export,
635                                            sizeof(KEY_INIT_RECOV),
636                                            KEY_INIT_RECOV,
637                                            sizeof(rc), &rc, NULL);
638                 }
639                 if (imp->imp_recon_bk) {
640                         CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
641                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
642                         spin_lock(&imp->imp_lock);
643                         imp->imp_last_recon = 1;
644                         spin_unlock(&imp->imp_lock);
645                 }
646         }
647
648         /* Reset connect flags to the originally requested flags, in case
649          * the server is updated on-the-fly we will get the new features. */
650         imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
651         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
652
653         rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
654                            &obd->obd_uuid, &imp->imp_connect_data, NULL);
655         if (rc)
656                 GOTO(out, rc);
657
658         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
659                                   5, size, tmp);
660         if (!request)
661                 GOTO(out, rc = -ENOMEM);
662
663         /* Report the rpc service time to the server so that it knows how long
664          * to wait for clients to join recovery */
665         lustre_msg_set_service_time(request->rq_reqmsg,
666                                     at_timeout2est(request->rq_timeout));
667
668         /* The amount of time we give the server to process the connect req.
669          * import_select_connection will increase the net latency on
670          * repeated reconnect attempts to cover slow networks.
671          * We override/ignore the server rpc completion estimate here,
672          * which may be large if this is a reconnect attempt */
673         request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
674         lustre_msg_set_timeout(request->rq_reqmsg, request->rq_timeout);
675
676 #ifndef __KERNEL__
677         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
678 #endif
679         if (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V1)
680                 lustre_msg_add_op_flags(request->rq_reqmsg,
681                                         MSG_CONNECT_NEXT_VER);
682
683         request->rq_no_resend = request->rq_no_delay = 1;
684         request->rq_send_state = LUSTRE_IMP_CONNECTING;
685         /* Allow a slightly larger reply for future growth compatibility */
686         size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
687                               16 * sizeof(__u64);
688         ptlrpc_req_set_repsize(request, 2, size);
689         request->rq_interpret_reply = ptlrpc_connect_interpret;
690
691         CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
692         aa = ptlrpc_req_async_args(request);
693         memset(aa, 0, sizeof *aa);
694
695         aa->pcaa_peer_committed = committed_before_reconnect;
696         aa->pcaa_initial_connect = initial_connect;
697         if (aa->pcaa_initial_connect) {
698                 spin_lock(&imp->imp_lock);
699                 imp->imp_replayable = 1;
700                 spin_unlock(&imp->imp_lock);
701                 lustre_msg_add_op_flags(request->rq_reqmsg,
702                                         MSG_CONNECT_INITIAL);
703         }
704
705         if (set_transno)
706                 lustre_msg_add_op_flags(request->rq_reqmsg,
707                                         MSG_CONNECT_TRANSNO);
708
709         DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
710                   aa->pcaa_initial_connect ? "initial " : "re",
711                   imp->imp_conn_cnt);
712         ptlrpcd_add_req(request);
713         rc = 0;
714 out:
715         if (rc != 0) {
716                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
717         }
718
719         RETURN(rc);
720 }
721 EXPORT_SYMBOL(ptlrpc_connect_import);
722
723 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
724 {
725 #ifdef __KERNEL__
726         struct obd_import_conn *imp_conn;
727 #endif
728         int wake_pinger = 0;
729
730         ENTRY;
731
732         spin_lock(&imp->imp_lock);
733         if (list_empty(&imp->imp_conn_list))
734                 GOTO(unlock, 0);
735
736 #ifdef __KERNEL__
737         imp_conn = list_entry(imp->imp_conn_list.prev,
738                               struct obd_import_conn,
739                               oic_item);
740
741         /* XXX: When the failover node is the primary node, it is possible
742          * to have two identical connections in imp_conn_list. We must
743          * compare not conn's pointers but NIDs, otherwise we can defeat
744          * connection throttling. (See bug 14774.) */
745         if (imp->imp_conn_current->oic_conn->c_peer.nid !=
746                                 imp_conn->oic_conn->c_peer.nid) {
747                 ptlrpc_ping_import_soon(imp);
748                 wake_pinger = 1;
749         }
750
751 #else
752         /* liblustre has no pinger thead, so we wakup pinger anyway */
753         wake_pinger = 1;
754 #endif
755  unlock:
756         spin_unlock(&imp->imp_lock);
757
758         if (wake_pinger)
759                 ptlrpc_pinger_wake_up();
760
761         EXIT;
762 }
763
764 static int ptlrpc_busy_reconnect(int rc)
765 {
766         return (rc == -EBUSY) || (rc == -EAGAIN);
767 }
768
769 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
770                                     void * data, int rc)
771 {
772         struct ptlrpc_connect_async_args *aa = data;
773         struct obd_import *imp = request->rq_import;
774         struct client_obd *cli = &imp->imp_obd->u.cli;
775         struct lustre_handle old_hdl;
776         __u64 old_connect_flags;
777         int msg_flags;
778         ENTRY;
779
780         spin_lock(&imp->imp_lock);
781         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
782                 spin_unlock(&imp->imp_lock);
783                 RETURN(0);
784         }
785
786         if (rc) {
787                 /* if this reconnect to busy export - not need select new target
788                  * for connecting*/
789                 if (ptlrpc_busy_reconnect(rc))
790                         imp->imp_force_reconnect = 1;
791                 spin_unlock(&imp->imp_lock);
792                 GOTO(out, rc);
793         }
794
795         LASSERT(imp->imp_conn_current);
796
797         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
798
799         /* All imports are pingable */
800         imp->imp_pingable = 1;
801         imp->imp_force_reconnect = 0;
802
803         if (aa->pcaa_initial_connect) {
804                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
805                         imp->imp_replayable = 1;
806                         spin_unlock(&imp->imp_lock);
807                         CDEBUG(D_HA, "connected to replayable target: %s\n",
808                                obd2cli_tgt(imp->imp_obd));
809                 } else {
810                         imp->imp_replayable = 0;
811                         spin_unlock(&imp->imp_lock);
812                 }
813
814                 if ((request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V1 &&
815                      msg_flags & MSG_CONNECT_NEXT_VER) ||
816                     request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2) {
817                         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
818                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
819                                obd2cli_tgt(imp->imp_obd));
820                 } else {
821                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
822                                obd2cli_tgt(imp->imp_obd));
823                 }
824
825                 imp->imp_remote_handle =
826                                 *lustre_msg_get_handle(request->rq_repmsg);
827
828                 /* Initial connects are allowed for clients with non-random
829                  * uuids when servers are in recovery.  Simply signal the
830                  * servers replay is complete and wait in REPLAY_WAIT. */
831                 if (msg_flags & MSG_CONNECT_RECOVERING) {
832                         CDEBUG(D_HA, "connect to %s during recovery\n",
833                                obd2cli_tgt(imp->imp_obd));
834                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
835                 } else {
836                         IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
837                         ptlrpc_activate_import(imp);
838                 }
839                 GOTO(finish, rc = 0);
840         } else {
841                 spin_unlock(&imp->imp_lock);
842         }
843
844         /* Determine what recovery state to move the import to. */
845         if (MSG_CONNECT_RECONNECT & msg_flags) {
846                 memset(&old_hdl, 0, sizeof(old_hdl));
847                 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
848                             sizeof (old_hdl))) {
849                         CERROR("%s@%s didn't like our handle "LPX64
850                                ", failed\n", obd2cli_tgt(imp->imp_obd),
851                                imp->imp_connection->c_remote_uuid.uuid,
852                                imp->imp_dlm_handle.cookie);
853                         GOTO(out, rc = -ENOTCONN);
854                 }
855
856                 if (memcmp(&imp->imp_remote_handle,
857                            lustre_msg_get_handle(request->rq_repmsg),
858                            sizeof(imp->imp_remote_handle))) {
859                         int level = msg_flags & MSG_CONNECT_RECOVERING ?
860                                 D_HA : D_WARNING;
861
862                         /* Bug 16611/14775: if server handle have changed,
863                          * that means some sort of disconnection happened.
864                          * If the server is not in recovery, that also means it
865                          * already erased all of our state because of previous
866                          * eviction. If it is in recovery - we are safe to
867                          * participate since we can reestablish all of our state
868                          * with server again */
869                         CDEBUG(level,"%s@%s changed server handle from "
870                                      LPX64" to "LPX64"%s\n",
871                                      obd2cli_tgt(imp->imp_obd),
872                                      imp->imp_connection->c_remote_uuid.uuid,
873                                      imp->imp_remote_handle.cookie,
874                                      lustre_msg_get_handle(request->rq_repmsg)->
875                                                                         cookie,
876                                      (MSG_CONNECT_RECOVERING & msg_flags) ?
877                                          " but is still in recovery" : "");
878
879                         imp->imp_remote_handle =
880                                      *lustre_msg_get_handle(request->rq_repmsg);
881
882                         if (!(MSG_CONNECT_RECOVERING & msg_flags)) {
883                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
884                                 GOTO(finish, rc = 0);
885                         }
886
887                 } else {
888                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
889                                obd2cli_tgt(imp->imp_obd),
890                                imp->imp_connection->c_remote_uuid.uuid);
891                 }
892
893                 if (imp->imp_invalid) {
894                         CDEBUG(D_HA, "%s: reconnected but import is invalid; "
895                                "marking evicted\n", imp->imp_obd->obd_name);
896                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
897                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
898                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
899                                imp->imp_obd->obd_name,
900                                obd2cli_tgt(imp->imp_obd));
901
902                         spin_lock(&imp->imp_lock);
903                         imp->imp_resend_replay = 1;
904                         /* VBR: delayed connection */
905                         if (MSG_CONNECT_DELAYED & msg_flags) {
906                                 imp->imp_delayed_recovery = 1;
907                                 imp->imp_no_lock_replay = 1;
908                         }
909                         spin_unlock(&imp->imp_lock);
910
911                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
912                 } else {
913                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
914                 }
915         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
916                 LASSERT(imp->imp_replayable);
917                 imp->imp_remote_handle =
918                                 *lustre_msg_get_handle(request->rq_repmsg);
919                 imp->imp_last_replay_transno = 0;
920                 /* VBR: delayed connection */
921                 if (MSG_CONNECT_DELAYED & msg_flags) {
922                         spin_lock(&imp->imp_lock);
923                         imp->imp_delayed_recovery = 1;
924                         imp->imp_no_lock_replay = 1;
925                         spin_unlock(&imp->imp_lock);
926                 }
927                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
928         } else {
929                 DEBUG_REQ(D_HA, request, "evicting (not initial connect and "
930                           "flags reconnect/recovering not set: %x)",msg_flags);
931                 imp->imp_remote_handle =
932                                 *lustre_msg_get_handle(request->rq_repmsg);
933                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
934         }
935
936         /* Sanity checks for a reconnected import. */
937         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
938                 CERROR("imp_replayable flag does not match server "
939                        "after reconnect. We should LBUG right here.\n");
940         }
941
942         if (lustre_msg_get_last_committed(request->rq_repmsg) <
943             aa->pcaa_peer_committed) {
944                 CERROR("%s went back in time (transno "LPD64
945                        " was previously committed, server now claims "LPD64
946                        ")!  See https://bugzilla.lustre.org/show_bug.cgi?"
947                        "id=9646\n",
948                        obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
949                        lustre_msg_get_last_committed(request->rq_repmsg));
950         }
951
952 finish:
953         rc = ptlrpc_import_recovery_state_machine(imp);
954         if (rc != 0) {
955                 if (rc == -ENOTCONN) {
956                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
957                                "invalidating and reconnecting\n",
958                                obd2cli_tgt(imp->imp_obd),
959                                imp->imp_connection->c_remote_uuid.uuid);
960                         ptlrpc_connect_import(imp, NULL);
961                         RETURN(0);
962                 }
963         } else {
964                 struct obd_connect_data *ocd;
965                 struct obd_export *exp;
966
967                 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
968                                          lustre_swab_connect);
969                 spin_lock(&imp->imp_lock);
970                 list_del(&imp->imp_conn_current->oic_item);
971                 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
972                 imp->imp_last_success_conn =
973                         imp->imp_conn_current->oic_last_attempt;
974
975                 if (ocd == NULL) {
976                         spin_unlock(&imp->imp_lock);
977                         CERROR("Wrong connect data from server\n");
978                         rc = -EPROTO;
979                         GOTO(out, rc);
980                 }
981
982                 imp->imp_connect_data = *ocd;
983
984                 exp = class_conn2export(&imp->imp_dlm_handle);
985                 spin_unlock(&imp->imp_lock);
986
987                 /* check that server granted subset of flags we asked for. */
988                 LASSERTF((ocd->ocd_connect_flags &
989                           imp->imp_connect_flags_orig) ==
990                          ocd->ocd_connect_flags, LPX64" != "LPX64,
991                          imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
992
993                 if (!exp) {
994                         /* This could happen if export is cleaned during the
995                            connect attempt */
996                         CERROR("Missing export for %s\n",
997                                imp->imp_obd->obd_name);
998                         GOTO(out, rc = -ENODEV);
999                 }
1000                 old_connect_flags = exp->exp_connect_flags;
1001                 exp->exp_connect_flags = ocd->ocd_connect_flags;
1002                 imp->imp_obd->obd_self_export->exp_connect_flags =
1003                         ocd->ocd_connect_flags;
1004                 class_export_put(exp);
1005
1006                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
1007
1008                 if (!ocd->ocd_ibits_known &&
1009                     ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
1010                         CERROR("Inodebits aware server returned zero compatible"
1011                                " bits?\n");
1012
1013                 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1014                     (ocd->ocd_version > LUSTRE_VERSION_CODE +
1015                                         LUSTRE_VERSION_OFFSET_WARN ||
1016                      ocd->ocd_version < LUSTRE_VERSION_CODE -
1017                                         LUSTRE_VERSION_OFFSET_WARN)) {
1018                         /* Sigh, some compilers do not like #ifdef in the middle
1019                            of macro arguments */
1020 #ifdef __KERNEL__
1021                         const char *older =
1022                                 "older.  Consider upgrading this client";
1023 #else
1024                         const char *older =
1025                                 "older.  Consider recompiling this application";
1026 #endif
1027                         const char *newer = "newer than client version";
1028
1029                         LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
1030                                       "is much %s (%s)\n",
1031                                       obd2cli_tgt(imp->imp_obd),
1032                                       OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1033                                       OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1034                                       OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1035                                       OBD_OCD_VERSION_FIX(ocd->ocd_version),
1036                                       ocd->ocd_version > LUSTRE_VERSION_CODE ?
1037                                       newer : older, LUSTRE_VERSION_STRING);
1038                 }
1039
1040                 if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
1041                         /* We sent to the server ocd_cksum_types with bits set
1042                          * for algorithms we understand. The server masked off
1043                          * the checksum types it doesn't support */
1044                         if ((ocd->ocd_cksum_types & OBD_CKSUM_ALL) == 0) {
1045                                 LCONSOLE_WARN("The negotiation of the checksum "
1046                                               "alogrithm to use with server %s "
1047                                               "failed (%x/%x), disabling "
1048                                               "checksums\n",
1049                                               obd2cli_tgt(imp->imp_obd),
1050                                               ocd->ocd_cksum_types,
1051                                               OBD_CKSUM_ALL);
1052                                 cli->cl_checksum = 0;
1053                                 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1054                                 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1055                         } else {
1056                                 cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
1057
1058                                 if (ocd->ocd_cksum_types & OSC_DEFAULT_CKSUM)
1059                                         cli->cl_cksum_type = OSC_DEFAULT_CKSUM;
1060                                 else if (ocd->ocd_cksum_types & OBD_CKSUM_ADLER)
1061                                         cli->cl_cksum_type = OBD_CKSUM_ADLER;
1062                                 else
1063                                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
1064                         }
1065                 } else {
1066                         /* The server does not support OBD_CONNECT_CKSUM.
1067                          * Enforce CRC32 for backward compatibility*/
1068                         cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1069                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
1070                 }
1071
1072                 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
1073                         cli->cl_max_pages_per_rpc =
1074                                 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
1075                 }
1076
1077                 /* Reset ns_connect_flags only for initial connect. It might be
1078                  * changed in while using FS and if we reset it in reconnect
1079                  * this leads to lossing user settings done before such as
1080                  * disable lru_resize, etc. */
1081                 if (old_connect_flags != exp->exp_connect_flags ||
1082                     aa->pcaa_initial_connect) {
1083                         CDEBUG(D_HA, "%s: Resetting ns_connect_flags to server "
1084                                "flags: "LPX64"\n", imp->imp_obd->obd_name,
1085                                ocd->ocd_connect_flags);
1086                         imp->imp_obd->obd_namespace->ns_connect_flags =
1087                                 ocd->ocd_connect_flags;
1088                         imp->imp_obd->obd_namespace->ns_orig_connect_flags =
1089                                 ocd->ocd_connect_flags;
1090                 }
1091
1092                 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
1093                     (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
1094                         /* We need a per-message support flag, because
1095                            a. we don't know if the incoming connect reply
1096                               supports AT or not (in reply_in_callback)
1097                               until we unpack it.
1098                            b. failovered server means export and flags are gone
1099                               (in ptlrpc_send_reply).
1100                            Can only be set when we know AT is supported at
1101                            both ends */
1102                         imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
1103                 else
1104                         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
1105
1106                 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
1107                         (cli->cl_max_pages_per_rpc > 0));
1108         }
1109
1110  out:
1111         if (rc != 0) {
1112                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
1113                 spin_lock(&imp->imp_lock);
1114                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov &&
1115                     (request->rq_import_generation == imp->imp_generation))
1116                         ptlrpc_deactivate_and_unlock_import(imp);
1117                 else
1118                         spin_unlock(&imp->imp_lock);
1119
1120                 if (imp->imp_recon_bk && imp->imp_last_recon) {
1121                         /* Give up trying to reconnect */
1122                         imp->imp_obd->obd_no_recov = 1;
1123                         ptlrpc_deactivate_import(imp);
1124                 }
1125
1126                 if (rc == -EPROTO) {
1127                         struct obd_connect_data *ocd;
1128                         ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
1129                                                  sizeof *ocd,
1130                                                  lustre_swab_connect);
1131                         if (ocd &&
1132                             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1133                             (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
1134                            /* Actually servers are only supposed to refuse
1135                               connection from liblustre clients, so we should
1136                               never see this from VFS context */
1137                                 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
1138                                         "(%d.%d.%d.%d)"
1139                                         " refused connection from this client "
1140                                         "with an incompatible version (%s).  "
1141                                         "Client must be recompiled\n",
1142                                         obd2cli_tgt(imp->imp_obd),
1143                                         OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1144                                         OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1145                                         OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1146                                         OBD_OCD_VERSION_FIX(ocd->ocd_version),
1147                                         LUSTRE_VERSION_STRING);
1148                                 ptlrpc_deactivate_import(imp);
1149                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
1150                         }
1151                         RETURN(-EPROTO);
1152                 }
1153
1154                 ptlrpc_maybe_ping_import_soon(imp);
1155
1156                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
1157                        obd2cli_tgt(imp->imp_obd),
1158                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
1159         }
1160
1161         spin_lock(&imp->imp_lock);
1162         imp->imp_last_recon = 0;
1163         spin_unlock(&imp->imp_lock);
1164
1165         cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1166         RETURN(rc);
1167 }
1168
1169 static int completed_replay_interpret(struct ptlrpc_request *req,
1170                                       void * data, int rc)
1171 {
1172         ENTRY;
1173         atomic_dec(&req->rq_import->imp_replay_inflight);
1174         if (req->rq_status == 0 &&
1175             !req->rq_import->imp_vbr_failed) {
1176                 ptlrpc_import_recovery_state_machine(req->rq_import);
1177         } else {
1178                 if (req->rq_import->imp_vbr_failed) {
1179                         CDEBUG(D_WARNING,
1180                                "%s: version recovery fails, reconnecting\n",
1181                                req->rq_import->imp_obd->obd_name);
1182                         spin_lock(&req->rq_import->imp_lock);
1183                         req->rq_import->imp_vbr_failed = 0;
1184                         spin_unlock(&req->rq_import->imp_lock);
1185                 } else {
1186                         CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
1187                                      "reconnecting\n",
1188                                req->rq_import->imp_obd->obd_name,
1189                                req->rq_status);
1190                 }
1191                 ptlrpc_connect_import(req->rq_import, NULL);
1192         }
1193         RETURN(0);
1194 }
1195
1196 static int signal_completed_replay(struct obd_import *imp)
1197 {
1198         struct ptlrpc_request *req;
1199         ENTRY;
1200
1201         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
1202         atomic_inc(&imp->imp_replay_inflight);
1203
1204         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
1205         if (!req) {
1206                 atomic_dec(&imp->imp_replay_inflight);
1207                 RETURN(-ENOMEM);
1208         }
1209
1210         ptlrpc_req_set_repsize(req, 1, NULL);
1211         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
1212         lustre_msg_add_flags(req->rq_reqmsg,
1213                              MSG_LOCK_REPLAY_DONE |
1214                              MSG_REQ_REPLAY_DONE |
1215                              MSG_LAST_REPLAY);
1216
1217         if (imp->imp_delayed_recovery)
1218                 lustre_msg_add_flags(req->rq_reqmsg, MSG_DELAY_REPLAY);
1219         req->rq_timeout *= 3;
1220         req->rq_interpret_reply = completed_replay_interpret;
1221
1222         ptlrpcd_add_req(req);
1223         RETURN(0);
1224 }
1225
1226 #ifdef __KERNEL__
1227 static int ptlrpc_invalidate_import_thread(void *data)
1228 {
1229         struct obd_import *imp = data;
1230
1231         ENTRY;
1232
1233         cfs_daemonize_ctxt("ll_imp_inval");
1234
1235         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
1236                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
1237                imp->imp_connection->c_remote_uuid.uuid);
1238
1239         ptlrpc_invalidate_import(imp);
1240
1241         if (obd_dump_on_eviction) {
1242                 CERROR("dump the log upon eviction\n");
1243                 libcfs_debug_dumplog();
1244         }
1245
1246         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1247         ptlrpc_import_recovery_state_machine(imp);
1248
1249         class_import_put(imp);
1250         RETURN(0);
1251 }
1252 #endif
1253
1254 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
1255 {
1256         int rc = 0;
1257         int inflight;
1258         char *target_start;
1259         int target_len;
1260
1261         ENTRY;
1262         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
1263                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1264                           &target_start, &target_len);
1265                 /* Don't care about MGC eviction */
1266                 if (strcmp(imp->imp_obd->obd_type->typ_name,
1267                            LUSTRE_MGC_NAME) != 0) {
1268                         LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
1269                                            "%.*s; in progress operations using "
1270                                            "this service will fail.\n",
1271                                            target_len, target_start);
1272                 }
1273                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
1274                        obd2cli_tgt(imp->imp_obd),
1275                        imp->imp_connection->c_remote_uuid.uuid);
1276
1277 #ifdef __KERNEL__
1278                 /* bug 17802:  XXX client_disconnect_export vs connect request
1279                  * race. if client will evicted at this time, we start
1280                  * invalidate thread without referece to import and import can
1281                  * be freed at same time. */
1282                 class_import_get(imp);
1283                 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
1284                                    CLONE_VM | CLONE_FILES);
1285                 if (rc < 0) {
1286                         class_import_put(imp);
1287                         CERROR("error starting invalidate thread: %d\n", rc);
1288                 } else {
1289                         rc = 0;
1290                 }
1291                 RETURN(rc);
1292 #else
1293                 ptlrpc_invalidate_import(imp);
1294
1295                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1296 #endif
1297         }
1298
1299         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1300                 CDEBUG(D_HA, "replay requested by %s\n",
1301                        obd2cli_tgt(imp->imp_obd));
1302                 rc = ptlrpc_replay_next(imp, &inflight);
1303                 if (inflight == 0 &&
1304                     atomic_read(&imp->imp_replay_inflight) == 0) {
1305                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
1306                         rc = ldlm_replay_locks(imp);
1307                         if (rc)
1308                                 GOTO(out, rc);
1309                 }
1310                 rc = 0;
1311         }
1312
1313         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1314                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1315                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
1316                         rc = signal_completed_replay(imp);
1317                         if (rc)
1318                                 GOTO(out, rc);
1319                 }
1320
1321         }
1322
1323         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1324                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1325                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1326                 }
1327         }
1328
1329         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1330                 CDEBUG(D_HA, "reconnected to %s@%s\n",
1331                        obd2cli_tgt(imp->imp_obd),
1332                        imp->imp_connection->c_remote_uuid.uuid);
1333
1334                 rc = ptlrpc_resend(imp);
1335                 if (rc)
1336                         GOTO(out, rc);
1337                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1338                 ptlrpc_activate_import(imp);
1339
1340                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1341                           &target_start, &target_len);
1342                 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1343                               "using nid %s.\n", imp->imp_obd->obd_name,
1344                               target_len, target_start,
1345                               libcfs_nid2str(imp->imp_connection->c_peer.nid));
1346         }
1347
1348         if (imp->imp_state == LUSTRE_IMP_FULL) {
1349                 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1350                 ptlrpc_wake_delayed(imp);
1351         }
1352
1353  out:
1354         RETURN(rc);
1355 }
1356
1357 static int back_to_sleep(void *unused)
1358 {
1359         return 0;
1360 }
1361
1362 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1363 {
1364         struct ptlrpc_request *req;
1365         int rq_opc, rc = 0;
1366         int nowait = imp->imp_obd->obd_force;
1367         ENTRY;
1368
1369         if (nowait)
1370                 GOTO(set_state, rc);
1371
1372         switch (imp->imp_connect_op) {
1373         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1374         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1375         case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1376         default:
1377                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1378                        obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1379                 RETURN(-EINVAL);
1380         }
1381
1382         if (ptlrpc_import_in_recovery(imp)) {
1383                 struct l_wait_info lwi;
1384                 cfs_duration_t timeout;
1385
1386                 if (AT_OFF) {
1387                         timeout = cfs_time_seconds(obd_timeout);
1388                 } else {
1389                         int idx = import_at_get_index(imp,
1390                                 imp->imp_client->cli_request_portal);
1391                         timeout = cfs_time_seconds(
1392                                 at_get(&imp->imp_at.iat_service_estimate[idx]));
1393                 }
1394                 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout),
1395                                        back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1396                 rc = l_wait_event(imp->imp_recovery_waitq,
1397                                   !ptlrpc_import_in_recovery(imp), &lwi);
1398         }
1399
1400         spin_lock(&imp->imp_lock);
1401         if (imp->imp_state != LUSTRE_IMP_FULL)
1402                 GOTO(out, 0);
1403
1404         spin_unlock(&imp->imp_lock);
1405
1406         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1407         if (req) {
1408                 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1409                  * it fails.  We can get through the above with a down server
1410                  * if the client doesn't know the server is gone yet. */
1411                 req->rq_no_resend = 1;
1412
1413 #ifndef CRAY_XT3
1414                 /* We want client umounts to happen quickly, no matter the
1415                    server state... */
1416                 req->rq_timeout = min_t(int, req->rq_timeout,
1417                                         INITIAL_CONNECT_TIMEOUT);
1418 #else
1419                 /* ... but we always want liblustre clients to nicely
1420                    disconnect, so only use the adaptive value. */
1421                 if (AT_OFF)
1422                         req->rq_timeout = obd_timeout / 3;
1423 #endif
1424
1425                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1426                 req->rq_send_state =  LUSTRE_IMP_CONNECTING;
1427                 ptlrpc_req_set_repsize(req, 1, NULL);
1428                 rc = ptlrpc_queue_wait(req);
1429                 ptlrpc_req_finished(req);
1430         }
1431
1432 set_state:
1433         spin_lock(&imp->imp_lock);
1434 out:
1435         if (noclose)
1436                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1437         else
1438                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1439         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1440         /* Try all connections in the future - bz 12758 */
1441         imp->imp_last_recon = 0;
1442         spin_unlock(&imp->imp_lock);
1443
1444         RETURN(rc);
1445 }
1446
1447 /* Sets maximal number of RPCs possible originating from other side of this
1448    import (server) to us and number of async RPC replies that we are not waiting
1449    for arriving */
1450 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1451 {
1452         LNetSetAsync(imp->imp_connection->c_peer, count);
1453 }
1454
1455 void ptlrpc_cleanup_imp(struct obd_import *imp)
1456 {
1457         ENTRY;
1458
1459         spin_lock(&imp->imp_lock);
1460         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1461         imp->imp_generation++;
1462         spin_unlock(&imp->imp_lock);
1463         ptlrpc_abort_inflight(imp);
1464
1465         EXIT;
1466 }
1467
1468 /* Adaptive Timeout utils */
1469 extern unsigned int at_min, at_max, at_history;
1470
1471 /* Bin into timeslices using AT_BINS bins.
1472    This gives us a max of the last binlimit*AT_BINS secs without the storage,
1473    but still smoothing out a return to normalcy from a slow response.
1474    (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1475 int at_add(struct adaptive_timeout *at, unsigned int val)
1476 {
1477         unsigned int old = at->at_current;
1478         time_t now = cfs_time_current_sec();
1479         time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
1480
1481         LASSERT(at);
1482         CDEBUG(D_OTHER, "add %u to %p time=%lu v=%u (%u %u %u %u)\n",
1483                val, at, now - at->at_binstart, at->at_current,
1484                at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1485
1486         if (val == 0)
1487                 /* 0's don't count, because we never want our timeout to
1488                    drop to 0, and because 0 could mean an error */
1489                 return 0;
1490
1491         spin_lock(&at->at_lock);
1492
1493         if (unlikely(at->at_binstart == 0)) {
1494                 /* Special case to remove default from history */
1495                 at->at_current = val;
1496                 at->at_worst_ever = val;
1497                 at->at_worst_time = now;
1498                 at->at_hist[0] = val;
1499                 at->at_binstart = now;
1500         } else if (now - at->at_binstart < binlimit ) {
1501                 /* in bin 0 */
1502                 at->at_hist[0] = max(val, at->at_hist[0]);
1503                 at->at_current = max(val, at->at_current);
1504         } else {
1505                 int i, shift;
1506                 unsigned int maxv = val;
1507                 /* move bins over */
1508                 shift = (now - at->at_binstart) / binlimit;
1509                 LASSERT(shift > 0);
1510                 for(i = AT_BINS - 1; i >= 0; i--) {
1511                         if (i >= shift) {
1512                                 at->at_hist[i] = at->at_hist[i - shift];
1513                                 maxv = max(maxv, at->at_hist[i]);
1514                         } else {
1515                                 at->at_hist[i] = 0;
1516                         }
1517                 }
1518                 at->at_hist[0] = val;
1519                 at->at_current = maxv;
1520                 at->at_binstart += shift * binlimit;
1521         }
1522
1523         if (at->at_current > at->at_worst_ever) {
1524                 at->at_worst_ever = at->at_current;
1525                 at->at_worst_time = now;
1526         }
1527
1528         if (at->at_flags & AT_FLG_NOHIST)
1529                 /* Only keep last reported val; keeping the rest of the history
1530                    for proc only */
1531                 at->at_current = val;
1532
1533         if (at_max > 0)
1534                 at->at_current =  min(at->at_current, at_max);
1535         at->at_current =  max(at->at_current, at_min);
1536
1537         if (at->at_current != old)
1538                 CDEBUG(D_OTHER, "AT %p change: old=%u new=%u delta=%d "
1539                        "(val=%u) hist %u %u %u %u\n", at,
1540                        old, at->at_current, at->at_current - old, val,
1541                        at->at_hist[0], at->at_hist[1], at->at_hist[2],
1542                        at->at_hist[3]);
1543
1544         /* if we changed, report the old value */
1545         old = (at->at_current != old) ? old : 0;
1546
1547         spin_unlock(&at->at_lock);
1548         return old;
1549 }
1550
1551 /* Find the imp_at index for a given portal; assign if space available */
1552 int import_at_get_index(struct obd_import *imp, int portal)
1553 {
1554         struct imp_at *at = &imp->imp_at;
1555         int i;
1556
1557         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1558                 if (at->iat_portal[i] == portal)
1559                         return i;
1560                 if (at->iat_portal[i] == 0)
1561                         /* unused */
1562                         break;
1563         }
1564
1565         /* Not found in list, add it under a lock */
1566         spin_lock(&imp->imp_lock);
1567
1568         /* Check unused under lock */
1569         for (; i < IMP_AT_MAX_PORTALS; i++) {
1570                 if (at->iat_portal[i] == portal)
1571                         goto out;
1572                 if (at->iat_portal[i] == 0)
1573                         /* unused */
1574                         break;
1575         }
1576
1577         /* Not enough portals? */
1578         LASSERT(i < IMP_AT_MAX_PORTALS);
1579
1580         at->iat_portal[i] = portal;
1581 out:
1582         spin_unlock(&imp->imp_lock);
1583         return i;
1584 }