Whamcloud - gitweb
7a272fbf056d30cad8d7718c59cabd19fbb9fb4e
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/import.c
37  *
38  * Author: Mike Shaver <shaver@clusterfs.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #ifndef __KERNEL__
43 # include <liblustre.h>
44 #endif
45
46 #include <obd_support.h>
47 #include <lustre_ha.h>
48 #include <lustre_net.h>
49 #include <lustre_import.h>
50 #include <lustre_export.h>
51 #include <obd.h>
52 #include <obd_class.h>
53
54 #include "ptlrpc_internal.h"
55
56 struct ptlrpc_connect_async_args {
57          __u64 pcaa_peer_committed;
58         int pcaa_initial_connect;
59 };
60
61 /* A CLOSED import should remain so. */
62 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
63 do {                                                                           \
64         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
65                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
66                       imp, obd2cli_tgt(imp->imp_obd),                          \
67                       ptlrpc_import_state_name(imp->imp_state),                \
68                       ptlrpc_import_state_name(state));                        \
69                imp->imp_state = state;                                         \
70         }                                                                      \
71 } while(0)
72
73 #define IMPORT_SET_STATE(imp, state)            \
74 do {                                            \
75         spin_lock(&imp->imp_lock);              \
76         IMPORT_SET_STATE_NOLOCK(imp, state);    \
77         spin_unlock(&imp->imp_lock);            \
78 } while(0)
79
80
81 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
82                                     void * data, int rc);
83 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
84
85 /* Only this function is allowed to change the import state when it is
86  * CLOSED. I would rather refcount the import and free it after
87  * disconnection like we do with exports. To do that, the client_obd
88  * will need to save the peer info somewhere other than in the import,
89  * though. */
90 int ptlrpc_init_import(struct obd_import *imp)
91 {
92         spin_lock(&imp->imp_lock);
93
94         imp->imp_generation++;
95         imp->imp_state =  LUSTRE_IMP_NEW;
96
97         spin_unlock(&imp->imp_lock);
98
99         return 0;
100 }
101 EXPORT_SYMBOL(ptlrpc_init_import);
102
103 #define UUID_STR "_UUID"
104 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
105                       int *uuid_len)
106 {
107         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
108                 ? uuid : uuid + strlen(prefix);
109
110         *uuid_len = strlen(*uuid_start);
111
112         if (*uuid_len < strlen(UUID_STR))
113                 return;
114
115         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
116                     UUID_STR, strlen(UUID_STR)))
117                 *uuid_len -= strlen(UUID_STR);
118 }
119
120 /* Returns true if import was FULL, false if import was already not
121  * connected.
122  * @imp - import to be disconnected
123  * @conn_cnt - connection count (epoch) of the request that timed out
124  *             and caused the disconnection.  In some cases, multiple
125  *             inflight requests can fail to a single target (e.g. OST
126  *             bulk requests) and if one has already caused a reconnection
127  *             (increasing the import->conn_cnt) the older failure should
128  *             not also cause a reconnection.  If zero it forces a reconnect.
129  */
130 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
131 {
132         int rc = 0;
133
134         spin_lock(&imp->imp_lock);
135
136         if (imp->imp_state == LUSTRE_IMP_FULL &&
137             (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
138                 char *target_start;
139                 int   target_len;
140
141                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
142                           &target_start, &target_len);
143                 if (imp->imp_replayable) {
144                         LCONSOLE_WARN("%s: Connection to service %.*s via nid "
145                                "%s was lost; in progress operations using this "
146                                "service will wait for recovery to complete.\n",
147                                imp->imp_obd->obd_name, target_len, target_start,
148                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
149                 } else {
150                         LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
151                                "%.*s via nid %s was lost; in progress "
152                                "operations using this service will fail.\n",
153                                imp->imp_obd->obd_name, target_len, target_start,
154                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
155                 }
156                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
157                 spin_unlock(&imp->imp_lock);
158
159                 if (obd_dump_on_timeout)
160                         libcfs_debug_dumplog();
161
162                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
163                 rc = 1;
164         } else {
165                 spin_unlock(&imp->imp_lock);
166                 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
167                        imp->imp_client->cli_name, imp,
168                        (imp->imp_state == LUSTRE_IMP_FULL &&
169                         imp->imp_conn_cnt > conn_cnt) ?
170                        "reconnected" : "not connected", imp->imp_conn_cnt,
171                        conn_cnt, ptlrpc_import_state_name(imp->imp_state));
172         }
173
174         return rc;
175 }
176
177 /* Must be called with imp_lock held! */
178 static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
179 {
180         ENTRY;
181         LASSERT_SPIN_LOCKED(&imp->imp_lock);
182
183         CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
184         imp->imp_invalid = 1;
185         imp->imp_generation++;
186         spin_unlock(&imp->imp_lock);
187
188         ptlrpc_abort_inflight(imp);
189         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
190 }
191
192 /*
193  * This acts as a barrier; all existing requests are rejected, and
194  * no new requests will be accepted until the import is valid again.
195  */
196 void ptlrpc_deactivate_import(struct obd_import *imp)
197 {
198         spin_lock(&imp->imp_lock);
199         ptlrpc_deactivate_and_unlock_import(imp);
200 }
201
202 static unsigned int
203 ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
204 {
205         long dl;
206
207         if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
208               (req->rq_phase == RQ_PHASE_BULK) ||
209               (req->rq_phase == RQ_PHASE_NEW)))
210                 return 0;
211
212         if (req->rq_timedout)
213                 return 0;
214
215         if (req->rq_phase == RQ_PHASE_NEW)
216                 dl = req->rq_sent;
217         else
218                 dl = req->rq_deadline;
219
220         if (dl <= now)
221                 return 0;
222
223         return dl - now;
224 }
225
226 static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
227 {
228         time_t now = cfs_time_current_sec();
229         struct list_head *tmp, *n;
230         struct ptlrpc_request *req;
231         unsigned int timeout = 0;
232
233         spin_lock(&imp->imp_lock);
234         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
235                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
236                 timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
237         }
238         spin_unlock(&imp->imp_lock);
239         return timeout;
240 }
241
242 /*
243  * This function will invalidate the import, if necessary, then block
244  * for all the RPC completions, and finally notify the obd to
245  * invalidate its state (ie cancel locks, clear pending requests,
246  * etc).
247  */
248 void ptlrpc_invalidate_import(struct obd_import *imp)
249 {
250         struct list_head *tmp, *n;
251         struct ptlrpc_request *req;
252         struct l_wait_info lwi;
253         unsigned int timeout;
254         int rc;
255
256         atomic_inc(&imp->imp_inval_count);
257
258         /*
259          * If this is an invalid MGC connection, then don't bother
260          * waiting for imp_inflight to drop to 0.
261          */
262         if (imp->imp_invalid && imp->imp_recon_bk &&!imp->imp_obd->obd_no_recov)
263                 goto out;
264
265         if (!imp->imp_invalid || imp->imp_obd->obd_no_recov)
266                 ptlrpc_deactivate_import(imp);
267
268         LASSERT(imp->imp_invalid);
269
270         /* Wait forever until inflight == 0. We really can't do it another
271          * way because in some cases we need to wait for very long reply
272          * unlink. We can't do anything before that because there is really
273          * no guarantee that some rdma transfer is not in progress right now. */
274         do {
275                 /* Calculate max timeout for waiting on rpcs to error
276                  * out. Use obd_timeout if calculated value is smaller
277                  * than it. */
278                 timeout = ptlrpc_inflight_timeout(imp);
279                 timeout += timeout / 3;
280
281                 if (timeout == 0)
282                         timeout = obd_timeout;
283
284                 CDEBUG(D_RPCTRACE,"Sleeping %d sec for inflight to error out\n",
285                        timeout);
286
287                 /* Wait for all requests to error out and call completion
288                  * callbacks. Cap it at obd_timeout -- these should all
289                  * have been locally cancelled by ptlrpc_abort_inflight. */
290                 lwi = LWI_TIMEOUT_INTERVAL(
291                         cfs_timeout_cap(cfs_time_seconds(timeout)),
292                         cfs_time_seconds(1), NULL, NULL);
293                 rc = l_wait_event(imp->imp_recovery_waitq,
294                                 (atomic_read(&imp->imp_inflight) == 0), &lwi);
295                 if (rc) {
296                         const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
297
298                         CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
299                                cli_tgt, rc, atomic_read(&imp->imp_inflight));
300
301                         spin_lock(&imp->imp_lock);
302                         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
303                                 req = list_entry(tmp, struct ptlrpc_request,
304                                         rq_list);
305                                 DEBUG_REQ(D_ERROR, req,"still on sending list");
306                         }
307                         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
308                                 req = list_entry(tmp, struct ptlrpc_request,
309                                         rq_list);
310                                 DEBUG_REQ(D_ERROR, req,"still on delayed list");
311                         }
312
313                         if (atomic_read(&imp->imp_unregistering) == 0) {
314                                 /* We know that only "unregistering" rpcs may
315                                  * still survive in sending or delaying lists
316                                  * (They are waiting for long reply unlink in
317                                  * sluggish nets). Let's check this. If there
318                                  * is no unregistering and inflight != 0 this
319                                  * is bug. */
320                                 LASSERT(atomic_read(&imp->imp_inflight) == 0);
321
322                                 /* Let's save one loop as soon as inflight have
323                                  * dropped to zero. No new inflights possible at
324                                  * this point. */
325                                 rc = 0;
326                         } else {
327                                 CERROR("%s: RPCs in \"%s\" phase found (%d). "
328                                        "Network is sluggish? Waiting them "
329                                        "to error out.\n", cli_tgt,
330                                        ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
331                                        atomic_read(&imp->imp_unregistering));
332                         }
333                         spin_unlock(&imp->imp_lock);
334                 }
335         } while (rc != 0);
336
337         /* Let's additionally check that no new rpcs added to import in
338          * "invalidate" state. */
339         LASSERT(atomic_read(&imp->imp_inflight) == 0);
340
341 out:
342         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
343
344         atomic_dec(&imp->imp_inval_count);
345         cfs_waitq_broadcast(&imp->imp_recovery_waitq);
346 }
347
348 /* unset imp_invalid */
349 void ptlrpc_activate_import(struct obd_import *imp)
350 {
351         struct obd_device *obd = imp->imp_obd;
352
353         spin_lock(&imp->imp_lock);
354         imp->imp_invalid = 0;
355         spin_unlock(&imp->imp_lock);
356
357         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
358 }
359
360 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
361 {
362         ENTRY;
363
364         LASSERT(!imp->imp_dlm_fake);
365
366         if (ptlrpc_set_import_discon(imp, conn_cnt)) {
367                 if (!imp->imp_replayable) {
368                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
369                                "auto-deactivating\n",
370                                obd2cli_tgt(imp->imp_obd),
371                                imp->imp_connection->c_remote_uuid.uuid,
372                                imp->imp_obd->obd_name);
373                         ptlrpc_deactivate_import(imp);
374                 }
375
376                 CDEBUG(D_HA, "%s: waking up pinger\n",
377                        obd2cli_tgt(imp->imp_obd));
378
379                 spin_lock(&imp->imp_lock);
380                 imp->imp_force_verify = 1;
381                 spin_unlock(&imp->imp_lock);
382
383                 ptlrpc_pinger_wake_up();
384         }
385         EXIT;
386 }
387
388 int ptlrpc_reconnect_import(struct obd_import *imp)
389 {
390
391         ptlrpc_set_import_discon(imp, 0);
392         /* Force a new connect attempt */
393         ptlrpc_invalidate_import(imp);
394         /* Do a fresh connect next time by zeroing the handle */
395         ptlrpc_disconnect_import(imp, 1);
396         /* Wait for all invalidate calls to finish */
397         if (atomic_read(&imp->imp_inval_count) > 0) {
398                 int rc;
399                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
400                 rc = l_wait_event(imp->imp_recovery_waitq,
401                                   (atomic_read(&imp->imp_inval_count) == 0),
402                                   &lwi);
403                 if (rc)
404                         CERROR("Interrupted, inval=%d\n",
405                                atomic_read(&imp->imp_inval_count));
406         }
407
408         /*
409          * Allow reconnect attempts. Note: Currently, the function is
410          * only called by MGC. So assume this is a recoverable import,
411          * and force import to be recoverable. fix this if you need to
412          */
413
414         imp->imp_obd->obd_no_recov = 0;
415         /* Remove 'invalid' flag */
416         ptlrpc_activate_import(imp);
417         /* Attempt a new connect */
418         ptlrpc_recover_import(imp, NULL);
419         return 0;
420 }
421
422 EXPORT_SYMBOL(ptlrpc_reconnect_import);
423
424 static int import_select_connection(struct obd_import *imp)
425 {
426         struct obd_import_conn *imp_conn = NULL, *conn;
427         struct obd_export *dlmexp;
428         int tried_all = 1;
429         ENTRY;
430
431         spin_lock(&imp->imp_lock);
432
433         if (list_empty(&imp->imp_conn_list)) {
434                 CERROR("%s: no connections available\n",
435                         imp->imp_obd->obd_name);
436                 spin_unlock(&imp->imp_lock);
437                 RETURN(-EINVAL);
438         }
439
440         list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
441                 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
442                        imp->imp_obd->obd_name,
443                        libcfs_nid2str(conn->oic_conn->c_peer.nid),
444                        conn->oic_last_attempt);
445
446                 /* Don't thrash connections */
447                 if (cfs_time_before_64(cfs_time_current_64(),
448                                      conn->oic_last_attempt +
449                                      cfs_time_seconds(CONNECTION_SWITCH_MIN))) {
450                         continue;
451                 }
452
453                 /* If we have not tried this connection since the
454                    the last successful attempt, go with this one */
455                 if ((conn->oic_last_attempt == 0) ||
456                     cfs_time_beforeq_64(conn->oic_last_attempt,
457                                        imp->imp_last_success_conn)) {
458                         imp_conn = conn;
459                         tried_all = 0;
460                         break;
461                 }
462
463                 /* If all of the connections have already been tried
464                    since the last successful connection; just choose the
465                    least recently used */
466                 if (!imp_conn)
467                         imp_conn = conn;
468                 else if (cfs_time_before_64(conn->oic_last_attempt,
469                                             imp_conn->oic_last_attempt))
470                         imp_conn = conn;
471         }
472
473         /* if not found, simply choose the current one */
474         if (!imp_conn) {
475                 LASSERT(imp->imp_conn_current);
476                 imp_conn = imp->imp_conn_current;
477                 tried_all = 0;
478         }
479         LASSERT(imp_conn->oic_conn);
480
481         /* If we've tried everything, and we're back to the beginning of the
482            list, increase our timeout and try again. It will be reset when
483            we do finally connect. (FIXME: really we should wait for all network
484            state associated with the last connection attempt to drain before
485            trying to reconnect on it.) */
486         if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item) &&
487             !imp->imp_recon_bk /* not retrying */) {
488                 if (at_get(&imp->imp_at.iat_net_latency) <
489                     CONNECTION_SWITCH_MAX) {
490                         at_add(&imp->imp_at.iat_net_latency,
491                                at_get(&imp->imp_at.iat_net_latency) +
492                                CONNECTION_SWITCH_INC);
493                 }
494                 LASSERT(imp_conn->oic_last_attempt);
495                 CWARN("%s: tried all connections, increasing latency to %ds\n",
496                       imp->imp_obd->obd_name,
497                       at_get(&imp->imp_at.iat_net_latency));
498         }
499
500         imp_conn->oic_last_attempt = cfs_time_current_64();
501
502         /* switch connection, don't mind if it's same as the current one */
503         if (imp->imp_connection)
504                 ptlrpc_connection_put(imp->imp_connection);
505         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
506
507         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
508         LASSERT(dlmexp != NULL);
509         if (dlmexp->exp_connection)
510                 ptlrpc_connection_put(dlmexp->exp_connection);
511         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
512         class_export_put(dlmexp);
513
514         if (imp->imp_conn_current != imp_conn) {
515                 if (imp->imp_conn_current)
516                         LCONSOLE_INFO("Changing connection for %s to %s/%s\n",
517                                       imp->imp_obd->obd_name,
518                                       imp_conn->oic_uuid.uuid,
519                                       libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
520                 imp->imp_conn_current = imp_conn;
521         }
522
523         CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
524                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
525                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
526
527         spin_unlock(&imp->imp_lock);
528
529         RETURN(0);
530 }
531
532 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
533 {
534         struct obd_device *obd = imp->imp_obd;
535         int initial_connect = 0;
536         int rc;
537         __u64 committed_before_reconnect = 0;
538         struct ptlrpc_request *request;
539         __u32 size[] = { sizeof(struct ptlrpc_body),
540                        sizeof(imp->imp_obd->u.cli.cl_target_uuid),
541                        sizeof(obd->obd_uuid),
542                        sizeof(imp->imp_dlm_handle),
543                        sizeof(imp->imp_connect_data) };
544         char *tmp[] = { NULL,
545                         obd2cli_tgt(imp->imp_obd),
546                         obd->obd_uuid.uuid,
547                         (char *)&imp->imp_dlm_handle,
548                         (char *)&imp->imp_connect_data };
549         struct ptlrpc_connect_async_args *aa;
550
551         ENTRY;
552         spin_lock(&imp->imp_lock);
553         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
554                 spin_unlock(&imp->imp_lock);
555                 CERROR("can't connect to a closed import\n");
556                 RETURN(-EINVAL);
557         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
558                 spin_unlock(&imp->imp_lock);
559                 CERROR("already connected\n");
560                 RETURN(0);
561         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
562                 spin_unlock(&imp->imp_lock);
563                 CERROR("already connecting\n");
564                 RETURN(-EALREADY);
565         }
566
567         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
568
569         imp->imp_conn_cnt++;
570         imp->imp_resend_replay = 0;
571
572         if (!lustre_handle_is_used(&imp->imp_remote_handle))
573                 initial_connect = 1;
574         else
575                 committed_before_reconnect = imp->imp_peer_committed_transno;
576
577         spin_unlock(&imp->imp_lock);
578
579         if (new_uuid) {
580                 struct obd_uuid uuid;
581
582                 obd_str2uuid(&uuid, new_uuid);
583                 rc = import_set_conn_priority(imp, &uuid);
584                 if (rc)
585                         GOTO(out, rc);
586         }
587
588         rc = import_select_connection(imp);
589         if (rc)
590                 GOTO(out, rc);
591
592         /* last in connection list */
593         if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
594                 if (imp->imp_initial_recov_bk && initial_connect) {
595                         CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
596                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
597                         /* Don't retry if connect fails */
598                         rc = 0;
599                         obd_set_info_async(obd->obd_self_export,
600                                            sizeof(KEY_INIT_RECOV),
601                                            KEY_INIT_RECOV,
602                                            sizeof(rc), &rc, NULL);
603                 }
604                 if (imp->imp_recon_bk) {
605                         CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
606                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
607                         spin_lock(&imp->imp_lock);
608                         imp->imp_last_recon = 1;
609                         spin_unlock(&imp->imp_lock);
610                 }
611         }
612
613         /* Reset connect flags to the originally requested flags, in case
614          * the server is updated on-the-fly we will get the new features. */
615         imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
616         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
617
618         rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
619                            &obd->obd_uuid, &imp->imp_connect_data, NULL);
620         if (rc)
621                 GOTO(out, rc);
622
623         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
624                                   5, size, tmp);
625         if (!request)
626                 GOTO(out, rc = -ENOMEM);
627
628         /* Report the rpc service time to the server so that it knows how long
629          * to wait for clients to join recovery */
630         lustre_msg_set_service_time(request->rq_reqmsg,
631                                     at_timeout2est(request->rq_timeout));
632
633         /* The amount of time we give the server to process the connect req.
634          * import_select_connection will increase the net latency on
635          * repeated reconnect attempts to cover slow networks.
636          * We override/ignore the server rpc completion estimate here,
637          * which may be large if this is a reconnect attempt */
638         request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
639         lustre_msg_set_timeout(request->rq_reqmsg, request->rq_timeout);
640
641 #ifndef __KERNEL__
642         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
643 #endif
644         if (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V1)
645                 lustre_msg_add_op_flags(request->rq_reqmsg,
646                                         MSG_CONNECT_NEXT_VER);
647
648         request->rq_no_resend = request->rq_no_delay = 1;
649         request->rq_send_state = LUSTRE_IMP_CONNECTING;
650         /* Allow a slightly larger reply for future growth compatibility */
651         size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
652                               16 * sizeof(__u64);
653         ptlrpc_req_set_repsize(request, 2, size);
654         request->rq_interpret_reply = ptlrpc_connect_interpret;
655
656         CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
657         aa = ptlrpc_req_async_args(request);
658         memset(aa, 0, sizeof *aa);
659
660         aa->pcaa_peer_committed = committed_before_reconnect;
661         aa->pcaa_initial_connect = initial_connect;
662         if (aa->pcaa_initial_connect) {
663                 spin_lock(&imp->imp_lock);
664                 imp->imp_replayable = 1;
665                 spin_unlock(&imp->imp_lock);
666                 lustre_msg_add_op_flags(request->rq_reqmsg,
667                                         MSG_CONNECT_INITIAL);
668         }
669
670         DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
671                   aa->pcaa_initial_connect ? "initial " : "re",
672                   imp->imp_conn_cnt);
673         ptlrpcd_add_req(request);
674         rc = 0;
675 out:
676         if (rc != 0) {
677                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
678         }
679
680         RETURN(rc);
681 }
682 EXPORT_SYMBOL(ptlrpc_connect_import);
683
684 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
685 {
686 #ifdef __KERNEL__
687         struct obd_import_conn *imp_conn;
688 #endif
689         int wake_pinger = 0;
690
691         ENTRY;
692
693         spin_lock(&imp->imp_lock);
694         if (list_empty(&imp->imp_conn_list))
695                 GOTO(unlock, 0);
696
697 #ifdef __KERNEL__
698         imp_conn = list_entry(imp->imp_conn_list.prev,
699                               struct obd_import_conn,
700                               oic_item);
701
702         /* XXX: When the failover node is the primary node, it is possible
703          * to have two identical connections in imp_conn_list. We must
704          * compare not conn's pointers but NIDs, otherwise we can defeat
705          * connection throttling. (See bug 14774.) */
706         if (imp->imp_conn_current->oic_conn->c_peer.nid !=
707                                 imp_conn->oic_conn->c_peer.nid) {
708                 ptlrpc_ping_import_soon(imp);
709                 wake_pinger = 1;
710         }
711
712 #else
713         /* liblustre has no pinger thead, so we wakup pinger anyway */
714         wake_pinger = 1;
715 #endif
716  unlock:
717         spin_unlock(&imp->imp_lock);
718
719         if (wake_pinger)
720                 ptlrpc_pinger_wake_up();
721
722         EXIT;
723 }
724
725 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
726                                     void * data, int rc)
727 {
728         struct ptlrpc_connect_async_args *aa = data;
729         struct obd_import *imp = request->rq_import;
730         struct client_obd *cli = &imp->imp_obd->u.cli;
731         struct lustre_handle old_hdl;
732         __u64 old_connect_flags;
733         int msg_flags;
734         ENTRY;
735
736         spin_lock(&imp->imp_lock);
737         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
738                 spin_unlock(&imp->imp_lock);
739                 RETURN(0);
740         }
741         spin_unlock(&imp->imp_lock);
742
743         if (rc)
744                 GOTO(out, rc);
745
746         LASSERT(imp->imp_conn_current);
747
748         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
749
750         /* All imports are pingable */
751         spin_lock(&imp->imp_lock);
752         imp->imp_pingable = 1;
753
754         if (aa->pcaa_initial_connect) {
755                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
756                         imp->imp_replayable = 1;
757                         spin_unlock(&imp->imp_lock);
758                         CDEBUG(D_HA, "connected to replayable target: %s\n",
759                                obd2cli_tgt(imp->imp_obd));
760                 } else {
761                         imp->imp_replayable = 0;
762                         spin_unlock(&imp->imp_lock);
763                 }
764
765                 if ((request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V1 &&
766                      msg_flags & MSG_CONNECT_NEXT_VER) ||
767                     request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2) {
768                         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
769                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
770                                obd2cli_tgt(imp->imp_obd));
771                 } else {
772                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
773                                obd2cli_tgt(imp->imp_obd));
774                 }
775
776                 imp->imp_remote_handle =
777                                 *lustre_msg_get_handle(request->rq_repmsg);
778
779                 /* Initial connects are allowed for clients with non-random
780                  * uuids when servers are in recovery.  Simply signal the
781                  * servers replay is complete and wait in REPLAY_WAIT. */
782                 if (msg_flags & MSG_CONNECT_RECOVERING) {
783                         CDEBUG(D_HA, "connect to %s during recovery\n",
784                                obd2cli_tgt(imp->imp_obd));
785                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
786                 } else {
787                         IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
788                         ptlrpc_activate_import(imp);
789                 }
790                 GOTO(finish, rc = 0);
791         } else {
792                 spin_unlock(&imp->imp_lock);
793         }
794
795         /* Determine what recovery state to move the import to. */
796         if (MSG_CONNECT_RECONNECT & msg_flags) {
797                 memset(&old_hdl, 0, sizeof(old_hdl));
798                 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
799                             sizeof (old_hdl))) {
800                         CERROR("%s@%s didn't like our handle "LPX64
801                                ", failed\n", obd2cli_tgt(imp->imp_obd),
802                                imp->imp_connection->c_remote_uuid.uuid,
803                                imp->imp_dlm_handle.cookie);
804                         GOTO(out, rc = -ENOTCONN);
805                 }
806
807                 if (memcmp(&imp->imp_remote_handle,
808                            lustre_msg_get_handle(request->rq_repmsg),
809                            sizeof(imp->imp_remote_handle))) {
810                         int level = msg_flags & MSG_CONNECT_RECOVERING ?
811                                 D_HA : D_WARNING;
812
813                         /* Bug 16611/14775: if server handle have changed,
814                          * that means some sort of disconnection happened.
815                          * If the server is not in recovery, that also means it
816                          * already erased all of our state because of previous
817                          * eviction. If it is in recovery - we are safe to
818                          * participate since we can reestablish all of our state
819                          * with server again */
820                         CDEBUG(level,"%s@%s changed server handle from "
821                                      LPX64" to "LPX64"%s\n",
822                                      obd2cli_tgt(imp->imp_obd),
823                                      imp->imp_connection->c_remote_uuid.uuid,
824                                      imp->imp_remote_handle.cookie,
825                                      lustre_msg_get_handle(request->rq_repmsg)->
826                                                                         cookie,
827                                      (MSG_CONNECT_RECOVERING & msg_flags) ?
828                                          " but is still in recovery" : "");
829
830                         imp->imp_remote_handle =
831                                      *lustre_msg_get_handle(request->rq_repmsg);
832
833                         if (!(MSG_CONNECT_RECOVERING & msg_flags)) {
834                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
835                                 GOTO(finish, rc = 0);
836                         }
837
838                 } else {
839                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
840                                obd2cli_tgt(imp->imp_obd),
841                                imp->imp_connection->c_remote_uuid.uuid);
842                 }
843
844                 if (imp->imp_invalid) {
845                         CDEBUG(D_HA, "%s: reconnected but import is invalid; "
846                                "marking evicted\n", imp->imp_obd->obd_name);
847                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
848                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
849                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
850                                imp->imp_obd->obd_name,
851                                obd2cli_tgt(imp->imp_obd));
852
853                         spin_lock(&imp->imp_lock);
854                         imp->imp_resend_replay = 1;
855                         /* VBR: delayed connection */
856                         if (MSG_CONNECT_DELAYED & msg_flags) {
857                                 imp->imp_delayed_recovery = 1;
858                                 imp->imp_no_lock_replay = 1;
859                         }
860                         spin_unlock(&imp->imp_lock);
861
862                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
863                 } else {
864                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
865                 }
866         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
867                 LASSERT(imp->imp_replayable);
868                 imp->imp_remote_handle =
869                                 *lustre_msg_get_handle(request->rq_repmsg);
870                 imp->imp_last_replay_transno = 0;
871                 /* VBR: delayed connection */
872                 if (MSG_CONNECT_DELAYED & msg_flags) {
873                         spin_lock(&imp->imp_lock);
874                         imp->imp_delayed_recovery = 1;
875                         imp->imp_no_lock_replay = 1;
876                         spin_unlock(&imp->imp_lock);
877                 }
878                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
879         } else {
880                 DEBUG_REQ(D_HA, request, "evicting (not initial connect and "
881                           "flags reconnect/recovering not set: %x)",msg_flags);
882                 imp->imp_remote_handle =
883                                 *lustre_msg_get_handle(request->rq_repmsg);
884                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
885         }
886
887         /* Sanity checks for a reconnected import. */
888         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
889                 CERROR("imp_replayable flag does not match server "
890                        "after reconnect. We should LBUG right here.\n");
891         }
892
893         if (lustre_msg_get_last_committed(request->rq_repmsg) <
894             aa->pcaa_peer_committed) {
895                 CERROR("%s went back in time (transno "LPD64
896                        " was previously committed, server now claims "LPD64
897                        ")!  See https://bugzilla.lustre.org/show_bug.cgi?"
898                        "id=9646\n",
899                        obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
900                        lustre_msg_get_last_committed(request->rq_repmsg));
901         }
902
903 finish:
904         rc = ptlrpc_import_recovery_state_machine(imp);
905         if (rc != 0) {
906                 if (rc == -ENOTCONN) {
907                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
908                                "invalidating and reconnecting\n",
909                                obd2cli_tgt(imp->imp_obd),
910                                imp->imp_connection->c_remote_uuid.uuid);
911                         ptlrpc_connect_import(imp, NULL);
912                         RETURN(0);
913                 }
914         } else {
915                 struct obd_connect_data *ocd;
916                 struct obd_export *exp;
917
918                 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
919                                          lustre_swab_connect);
920                 spin_lock(&imp->imp_lock);
921                 list_del(&imp->imp_conn_current->oic_item);
922                 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
923                 imp->imp_last_success_conn =
924                         imp->imp_conn_current->oic_last_attempt;
925
926                 if (ocd == NULL) {
927                         spin_unlock(&imp->imp_lock);
928                         CERROR("Wrong connect data from server\n");
929                         rc = -EPROTO;
930                         GOTO(out, rc);
931                 }
932
933                 imp->imp_connect_data = *ocd;
934
935                 exp = class_conn2export(&imp->imp_dlm_handle);
936                 spin_unlock(&imp->imp_lock);
937
938                 /* check that server granted subset of flags we asked for. */
939                 LASSERTF((ocd->ocd_connect_flags &
940                           imp->imp_connect_flags_orig) ==
941                          ocd->ocd_connect_flags, LPX64" != "LPX64,
942                          imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
943
944                 if (!exp) {
945                         /* This could happen if export is cleaned during the
946                            connect attempt */
947                         CERROR("Missing export for %s\n",
948                                imp->imp_obd->obd_name);
949                         GOTO(out, rc = -ENODEV);
950                 }
951                 old_connect_flags = exp->exp_connect_flags;
952                 exp->exp_connect_flags = ocd->ocd_connect_flags;
953                 imp->imp_obd->obd_self_export->exp_connect_flags =
954                         ocd->ocd_connect_flags;
955                 class_export_put(exp);
956
957                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
958
959                 if (!ocd->ocd_ibits_known &&
960                     ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
961                         CERROR("Inodebits aware server returned zero compatible"
962                                " bits?\n");
963
964                 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
965                     (ocd->ocd_version > LUSTRE_VERSION_CODE +
966                                         LUSTRE_VERSION_OFFSET_WARN ||
967                      ocd->ocd_version < LUSTRE_VERSION_CODE -
968                                         LUSTRE_VERSION_OFFSET_WARN)) {
969                         /* Sigh, some compilers do not like #ifdef in the middle
970                            of macro arguments */
971 #ifdef __KERNEL__
972                         const char *older =
973                                 "older.  Consider upgrading this client";
974 #else
975                         const char *older =
976                                 "older.  Consider recompiling this application";
977 #endif
978                         const char *newer = "newer than client version";
979
980                         LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
981                                       "is much %s (%s)\n",
982                                       obd2cli_tgt(imp->imp_obd),
983                                       OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
984                                       OBD_OCD_VERSION_MINOR(ocd->ocd_version),
985                                       OBD_OCD_VERSION_PATCH(ocd->ocd_version),
986                                       OBD_OCD_VERSION_FIX(ocd->ocd_version),
987                                       ocd->ocd_version > LUSTRE_VERSION_CODE ?
988                                       newer : older, LUSTRE_VERSION_STRING);
989                 }
990
991                 if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
992                         /* We sent to the server ocd_cksum_types with bits set
993                          * for algorithms we understand. The server masked off
994                          * the checksum types it doesn't support */
995                         if ((ocd->ocd_cksum_types & OBD_CKSUM_ALL) == 0) {
996                                 LCONSOLE_WARN("The negotiation of the checksum "
997                                               "alogrithm to use with server %s "
998                                               "failed (%x/%x), disabling "
999                                               "checksums\n",
1000                                               obd2cli_tgt(imp->imp_obd),
1001                                               ocd->ocd_cksum_types,
1002                                               OBD_CKSUM_ALL);
1003                                 cli->cl_checksum = 0;
1004                                 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1005                                 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1006                         } else {
1007                                 cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
1008
1009                                 if (ocd->ocd_cksum_types & OSC_DEFAULT_CKSUM)
1010                                         cli->cl_cksum_type = OSC_DEFAULT_CKSUM;
1011                                 else if (ocd->ocd_cksum_types & OBD_CKSUM_ADLER)
1012                                         cli->cl_cksum_type = OBD_CKSUM_ADLER;
1013                                 else
1014                                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
1015                         }
1016                 } else {
1017                         /* The server does not support OBD_CONNECT_CKSUM.
1018                          * Enforce CRC32 for backward compatibility*/
1019                         cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1020                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
1021                 }
1022
1023                 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
1024                         cli->cl_max_pages_per_rpc =
1025                                 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
1026                 }
1027
1028                 /* Reset ns_connect_flags only for initial connect. It might be
1029                  * changed in while using FS and if we reset it in reconnect
1030                  * this leads to lossing user settings done before such as
1031                  * disable lru_resize, etc. */
1032                 if (old_connect_flags != exp->exp_connect_flags ||
1033                     aa->pcaa_initial_connect) {
1034                         CDEBUG(D_HA, "%s: Resetting ns_connect_flags to server "
1035                                "flags: "LPX64"\n", imp->imp_obd->obd_name,
1036                                ocd->ocd_connect_flags);
1037                         imp->imp_obd->obd_namespace->ns_connect_flags =
1038                                 ocd->ocd_connect_flags;
1039                         imp->imp_obd->obd_namespace->ns_orig_connect_flags =
1040                                 ocd->ocd_connect_flags;
1041                 }
1042
1043                 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
1044                     (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
1045                         /* We need a per-message support flag, because
1046                            a. we don't know if the incoming connect reply
1047                               supports AT or not (in reply_in_callback)
1048                               until we unpack it.
1049                            b. failovered server means export and flags are gone
1050                               (in ptlrpc_send_reply).
1051                            Can only be set when we know AT is supported at
1052                            both ends */
1053                         imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
1054                 else
1055                         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
1056
1057                 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
1058                         (cli->cl_max_pages_per_rpc > 0));
1059         }
1060
1061  out:
1062         if (rc != 0) {
1063                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
1064                 spin_lock(&imp->imp_lock);
1065                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov &&
1066                     (request->rq_import_generation == imp->imp_generation))
1067                         ptlrpc_deactivate_and_unlock_import(imp);
1068                 else
1069                         spin_unlock(&imp->imp_lock);
1070
1071                 if (imp->imp_recon_bk && imp->imp_last_recon) {
1072                         /* Give up trying to reconnect */
1073                         imp->imp_obd->obd_no_recov = 1;
1074                         ptlrpc_deactivate_import(imp);
1075                 }
1076
1077                 if (rc == -EPROTO) {
1078                         struct obd_connect_data *ocd;
1079                         ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
1080                                                  sizeof *ocd,
1081                                                  lustre_swab_connect);
1082                         if (ocd &&
1083                             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1084                             (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
1085                            /* Actually servers are only supposed to refuse
1086                               connection from liblustre clients, so we should
1087                               never see this from VFS context */
1088                                 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
1089                                         "(%d.%d.%d.%d)"
1090                                         " refused connection from this client "
1091                                         "with an incompatible version (%s).  "
1092                                         "Client must be recompiled\n",
1093                                         obd2cli_tgt(imp->imp_obd),
1094                                         OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1095                                         OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1096                                         OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1097                                         OBD_OCD_VERSION_FIX(ocd->ocd_version),
1098                                         LUSTRE_VERSION_STRING);
1099                                 ptlrpc_deactivate_import(imp);
1100                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
1101                         }
1102                         RETURN(-EPROTO);
1103                 }
1104
1105                 ptlrpc_maybe_ping_import_soon(imp);
1106
1107                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
1108                        obd2cli_tgt(imp->imp_obd),
1109                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
1110         }
1111
1112         spin_lock(&imp->imp_lock);
1113         imp->imp_last_recon = 0;
1114         spin_unlock(&imp->imp_lock);
1115
1116         cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1117         RETURN(rc);
1118 }
1119
1120 static int completed_replay_interpret(struct ptlrpc_request *req,
1121                                       void * data, int rc)
1122 {
1123         ENTRY;
1124         atomic_dec(&req->rq_import->imp_replay_inflight);
1125         if (req->rq_status == 0 &&
1126             !req->rq_import->imp_vbr_failed) {
1127                 ptlrpc_import_recovery_state_machine(req->rq_import);
1128         } else {
1129                 if (req->rq_import->imp_vbr_failed) {
1130                         CDEBUG(D_WARNING,
1131                                "%s: version recovery fails, reconnecting\n",
1132                                req->rq_import->imp_obd->obd_name);
1133                         spin_lock(&req->rq_import->imp_lock);
1134                         req->rq_import->imp_vbr_failed = 0;
1135                         spin_unlock(&req->rq_import->imp_lock);
1136                 } else {
1137                         CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
1138                                      "reconnecting\n",
1139                                req->rq_import->imp_obd->obd_name,
1140                                req->rq_status);
1141                 }
1142                 ptlrpc_connect_import(req->rq_import, NULL);
1143         }
1144         RETURN(0);
1145 }
1146
1147 static int signal_completed_replay(struct obd_import *imp)
1148 {
1149         struct ptlrpc_request *req;
1150         ENTRY;
1151
1152         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
1153         atomic_inc(&imp->imp_replay_inflight);
1154
1155         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
1156         if (!req) {
1157                 atomic_dec(&imp->imp_replay_inflight);
1158                 RETURN(-ENOMEM);
1159         }
1160
1161         ptlrpc_req_set_repsize(req, 1, NULL);
1162         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
1163         lustre_msg_add_flags(req->rq_reqmsg, MSG_LAST_REPLAY);
1164         if (imp->imp_delayed_recovery)
1165                 lustre_msg_add_flags(req->rq_reqmsg, MSG_DELAY_REPLAY);
1166         req->rq_timeout *= 3;
1167         req->rq_interpret_reply = completed_replay_interpret;
1168
1169         ptlrpcd_add_req(req);
1170         RETURN(0);
1171 }
1172
1173 #ifdef __KERNEL__
1174 static int ptlrpc_invalidate_import_thread(void *data)
1175 {
1176         struct obd_import *imp = data;
1177         int disconnect;
1178
1179         ENTRY;
1180
1181         ptlrpc_daemonize("ll_imp_inval");
1182
1183         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
1184                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
1185                imp->imp_connection->c_remote_uuid.uuid);
1186
1187         ptlrpc_invalidate_import(imp);
1188
1189         /* is client_disconnect_export in flight ? */
1190         spin_lock(&imp->imp_lock);
1191         disconnect = imp->imp_deactive;
1192         spin_unlock(&imp->imp_lock);
1193         if (disconnect)
1194                 GOTO(out, 0 );
1195
1196         if (obd_dump_on_eviction) {
1197                 CERROR("dump the log upon eviction\n");
1198                 libcfs_debug_dumplog();
1199         }
1200
1201         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1202         ptlrpc_import_recovery_state_machine(imp);
1203
1204 out:
1205         class_import_put(imp);
1206         RETURN(0);
1207 }
1208 #endif
1209
1210 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
1211 {
1212         int rc = 0;
1213         int inflight;
1214         char *target_start;
1215         int target_len;
1216
1217         ENTRY;
1218         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
1219                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1220                           &target_start, &target_len);
1221                 /* Don't care about MGC eviction */
1222                 if (strcmp(imp->imp_obd->obd_type->typ_name,
1223                            LUSTRE_MGC_NAME) != 0) {
1224                         LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
1225                                            "%.*s; in progress operations using "
1226                                            "this service will fail.\n",
1227                                            target_len, target_start);
1228                 }
1229                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
1230                        obd2cli_tgt(imp->imp_obd),
1231                        imp->imp_connection->c_remote_uuid.uuid);
1232
1233 #ifdef __KERNEL__
1234                 /* bug 17802:  XXX client_disconnect_export vs connect request
1235                  * race. if client will evicted at this time, we start
1236                  * invalidate thread without referece to import and import can
1237                  * be freed at same time. */
1238                 class_import_get(imp);
1239                 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
1240                                    CLONE_VM | CLONE_FILES);
1241                 if (rc < 0) {
1242                         class_import_put(imp);
1243                         CERROR("error starting invalidate thread: %d\n", rc);
1244                 } else {
1245                         rc = 0;
1246                 }
1247                 RETURN(rc);
1248 #else
1249                 ptlrpc_invalidate_import(imp);
1250
1251                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1252 #endif
1253         }
1254
1255         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1256                 CDEBUG(D_HA, "replay requested by %s\n",
1257                        obd2cli_tgt(imp->imp_obd));
1258                 rc = ptlrpc_replay_next(imp, &inflight);
1259                 if (inflight == 0 &&
1260                     atomic_read(&imp->imp_replay_inflight) == 0) {
1261                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
1262                         rc = ldlm_replay_locks(imp);
1263                         if (rc)
1264                                 GOTO(out, rc);
1265                 }
1266                 rc = 0;
1267         }
1268
1269         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1270                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1271                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
1272                         rc = signal_completed_replay(imp);
1273                         if (rc)
1274                                 GOTO(out, rc);
1275                 }
1276
1277         }
1278
1279         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1280                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1281                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1282                 }
1283         }
1284
1285         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1286                 CDEBUG(D_HA, "reconnected to %s@%s\n",
1287                        obd2cli_tgt(imp->imp_obd),
1288                        imp->imp_connection->c_remote_uuid.uuid);
1289
1290                 rc = ptlrpc_resend(imp);
1291                 if (rc)
1292                         GOTO(out, rc);
1293                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1294                 ptlrpc_activate_import(imp);
1295
1296                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1297                           &target_start, &target_len);
1298                 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1299                               "using nid %s.\n", imp->imp_obd->obd_name,
1300                               target_len, target_start,
1301                               libcfs_nid2str(imp->imp_connection->c_peer.nid));
1302         }
1303
1304         if (imp->imp_state == LUSTRE_IMP_FULL) {
1305                 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1306                 ptlrpc_wake_delayed(imp);
1307         }
1308
1309  out:
1310         RETURN(rc);
1311 }
1312
1313 static int back_to_sleep(void *unused)
1314 {
1315         return 0;
1316 }
1317
1318 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1319 {
1320         struct ptlrpc_request *req;
1321         int rq_opc, rc = 0;
1322         int nowait = imp->imp_obd->obd_force;
1323         ENTRY;
1324
1325         if (nowait)
1326                 GOTO(set_state, rc);
1327
1328         switch (imp->imp_connect_op) {
1329         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1330         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1331         case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1332         default:
1333                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1334                        obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1335                 RETURN(-EINVAL);
1336         }
1337
1338         if (ptlrpc_import_in_recovery(imp)) {
1339                 struct l_wait_info lwi;
1340                 cfs_duration_t timeout;
1341
1342                 if (AT_OFF) {
1343                         timeout = cfs_time_seconds(obd_timeout);
1344                 } else {
1345                         int idx = import_at_get_index(imp,
1346                                 imp->imp_client->cli_request_portal);
1347                         timeout = cfs_time_seconds(
1348                                 at_get(&imp->imp_at.iat_service_estimate[idx]));
1349                 }
1350                 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout),
1351                                        back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1352                 rc = l_wait_event(imp->imp_recovery_waitq,
1353                                   !ptlrpc_import_in_recovery(imp), &lwi);
1354         }
1355
1356         spin_lock(&imp->imp_lock);
1357         if (imp->imp_state != LUSTRE_IMP_FULL)
1358                 GOTO(out, 0);
1359
1360         spin_unlock(&imp->imp_lock);
1361
1362         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1363         if (req) {
1364                 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1365                  * it fails.  We can get through the above with a down server
1366                  * if the client doesn't know the server is gone yet. */
1367                 req->rq_no_resend = 1;
1368
1369 #ifndef CRAY_XT3
1370                 /* We want client umounts to happen quickly, no matter the
1371                    server state... */
1372                 req->rq_timeout = min_t(int, req->rq_timeout,
1373                                         INITIAL_CONNECT_TIMEOUT);
1374 #else
1375                 /* ... but we always want liblustre clients to nicely
1376                    disconnect, so only use the adaptive value. */
1377                 if (AT_OFF)
1378                         req->rq_timeout = obd_timeout / 3;
1379 #endif
1380
1381                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1382                 req->rq_send_state =  LUSTRE_IMP_CONNECTING;
1383                 ptlrpc_req_set_repsize(req, 1, NULL);
1384                 rc = ptlrpc_queue_wait(req);
1385                 ptlrpc_req_finished(req);
1386         }
1387
1388 set_state:
1389         spin_lock(&imp->imp_lock);
1390 out:
1391         if (noclose)
1392                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1393         else
1394                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1395         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1396         /* Try all connections in the future - bz 12758 */
1397         imp->imp_last_recon = 0;
1398         spin_unlock(&imp->imp_lock);
1399
1400         RETURN(rc);
1401 }
1402
1403 /* Sets maximal number of RPCs possible originating from other side of this
1404    import (server) to us and number of async RPC replies that we are not waiting
1405    for arriving */
1406 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1407 {
1408         LNetSetAsync(imp->imp_connection->c_peer, count);
1409 }
1410
1411
1412 /* Adaptive Timeout utils */
1413 extern unsigned int at_min, at_max, at_history;
1414
1415 /* Bin into timeslices using AT_BINS bins.
1416    This gives us a max of the last binlimit*AT_BINS secs without the storage,
1417    but still smoothing out a return to normalcy from a slow response.
1418    (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1419 int at_add(struct adaptive_timeout *at, unsigned int val)
1420 {
1421         unsigned int old = at->at_current;
1422         time_t now = cfs_time_current_sec();
1423         time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
1424
1425         LASSERT(at);
1426 #if 0
1427         CDEBUG(D_INFO, "add %u to %p time=%lu v=%u (%u %u %u %u)\n",
1428                val, at, now - at->at_binstart, at->at_current,
1429                at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1430 #endif
1431         if (val == 0)
1432                 /* 0's don't count, because we never want our timeout to
1433                    drop to 0, and because 0 could mean an error */
1434                 return 0;
1435
1436         spin_lock(&at->at_lock);
1437
1438         if (unlikely(at->at_binstart == 0)) {
1439                 /* Special case to remove default from history */
1440                 at->at_current = val;
1441                 at->at_worst_ever = val;
1442                 at->at_worst_time = now;
1443                 at->at_hist[0] = val;
1444                 at->at_binstart = now;
1445         } else if (now - at->at_binstart < binlimit ) {
1446                 /* in bin 0 */
1447                 at->at_hist[0] = max(val, at->at_hist[0]);
1448                 at->at_current = max(val, at->at_current);
1449         } else {
1450                 int i, shift;
1451                 unsigned int maxv = val;
1452                 /* move bins over */
1453                 shift = (now - at->at_binstart) / binlimit;
1454                 LASSERT(shift > 0);
1455                 for(i = AT_BINS - 1; i >= 0; i--) {
1456                         if (i >= shift) {
1457                                 at->at_hist[i] = at->at_hist[i - shift];
1458                                 maxv = max(maxv, at->at_hist[i]);
1459                         } else {
1460                                 at->at_hist[i] = 0;
1461                         }
1462                 }
1463                 at->at_hist[0] = val;
1464                 at->at_current = maxv;
1465                 at->at_binstart += shift * binlimit;
1466         }
1467
1468         if (at->at_current > at->at_worst_ever) {
1469                 at->at_worst_ever = at->at_current;
1470                 at->at_worst_time = now;
1471         }
1472
1473         if (at->at_flags & AT_FLG_NOHIST)
1474                 /* Only keep last reported val; keeping the rest of the history
1475                    for proc only */
1476                 at->at_current = val;
1477
1478         if (at_max > 0)
1479                 at->at_current =  min(at->at_current, at_max);
1480         at->at_current =  max(at->at_current, at_min);
1481
1482 #if 0
1483         if (at->at_current != old)
1484                 CDEBUG(D_ADAPTTO, "AT %p change: old=%u new=%u delta=%d "
1485                        "(val=%u) hist %u %u %u %u\n", at,
1486                        old, at->at_current, at->at_current - old, val,
1487                        at->at_hist[0], at->at_hist[1], at->at_hist[2],
1488                        at->at_hist[3]);
1489 #endif
1490
1491         /* if we changed, report the old value */
1492         old = (at->at_current != old) ? old : 0;
1493
1494         spin_unlock(&at->at_lock);
1495         return old;
1496 }
1497
1498 /* Find the imp_at index for a given portal; assign if space available */
1499 int import_at_get_index(struct obd_import *imp, int portal)
1500 {
1501         struct imp_at *at = &imp->imp_at;
1502         int i;
1503
1504         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1505                 if (at->iat_portal[i] == portal)
1506                         return i;
1507                 if (at->iat_portal[i] == 0)
1508                         /* unused */
1509                         break;
1510         }
1511
1512         /* Not found in list, add it under a lock */
1513         spin_lock(&imp->imp_lock);
1514
1515         /* Check unused under lock */
1516         for (; i < IMP_AT_MAX_PORTALS; i++) {
1517                 if (at->iat_portal[i] == portal)
1518                         goto out;
1519                 if (at->iat_portal[i] == 0)
1520                         /* unused */
1521                         break;
1522         }
1523
1524         /* Not enough portals? */
1525         LASSERT(i < IMP_AT_MAX_PORTALS);
1526
1527         at->iat_portal[i] = portal;
1528 out:
1529         spin_unlock(&imp->imp_lock);
1530         return i;
1531 }