Whamcloud - gitweb
Land b1_8_gate onto b1_8 (20081218_1708)
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/import.c
37  *
38  * Author: Mike Shaver <shaver@clusterfs.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #ifndef __KERNEL__
43 # include <liblustre.h>
44 #endif
45
46 #include <obd_support.h>
47 #include <lustre_ha.h>
48 #include <lustre_net.h>
49 #include <lustre_import.h>
50 #include <lustre_export.h>
51 #include <obd.h>
52 #include <obd_class.h>
53
54 #include "ptlrpc_internal.h"
55
56 struct ptlrpc_connect_async_args {
57          __u64 pcaa_peer_committed;
58         int pcaa_initial_connect;
59 };
60
61 /* A CLOSED import should remain so. */
62 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
63 do {                                                                           \
64         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
65                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
66                       imp, obd2cli_tgt(imp->imp_obd),                          \
67                       ptlrpc_import_state_name(imp->imp_state),                \
68                       ptlrpc_import_state_name(state));                        \
69                imp->imp_state = state;                                         \
70         }                                                                      \
71 } while(0)
72
73 #define IMPORT_SET_STATE(imp, state)            \
74 do {                                            \
75         spin_lock(&imp->imp_lock);              \
76         IMPORT_SET_STATE_NOLOCK(imp, state);    \
77         spin_unlock(&imp->imp_lock);            \
78 } while(0)
79
80
81 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
82                                     void * data, int rc);
83 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
84
85 /* Only this function is allowed to change the import state when it is
86  * CLOSED. I would rather refcount the import and free it after
87  * disconnection like we do with exports. To do that, the client_obd
88  * will need to save the peer info somewhere other than in the import,
89  * though. */
90 int ptlrpc_init_import(struct obd_import *imp)
91 {
92         spin_lock(&imp->imp_lock);
93
94         imp->imp_generation++;
95         imp->imp_state =  LUSTRE_IMP_NEW;
96
97         spin_unlock(&imp->imp_lock);
98
99         return 0;
100 }
101 EXPORT_SYMBOL(ptlrpc_init_import);
102
103 #define UUID_STR "_UUID"
104 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
105                       int *uuid_len)
106 {
107         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
108                 ? uuid : uuid + strlen(prefix);
109
110         *uuid_len = strlen(*uuid_start);
111
112         if (*uuid_len < strlen(UUID_STR))
113                 return;
114
115         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
116                     UUID_STR, strlen(UUID_STR)))
117                 *uuid_len -= strlen(UUID_STR);
118 }
119
120 /* Returns true if import was FULL, false if import was already not
121  * connected.
122  * @imp - import to be disconnected
123  * @conn_cnt - connection count (epoch) of the request that timed out
124  *             and caused the disconnection.  In some cases, multiple
125  *             inflight requests can fail to a single target (e.g. OST
126  *             bulk requests) and if one has already caused a reconnection
127  *             (increasing the import->conn_cnt) the older failure should
128  *             not also cause a reconnection.  If zero it forces a reconnect.
129  */
130 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
131 {
132         int rc = 0;
133
134         spin_lock(&imp->imp_lock);
135
136         if (imp->imp_state == LUSTRE_IMP_FULL &&
137             (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
138                 char *target_start;
139                 int   target_len;
140
141                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
142                           &target_start, &target_len);
143                 if (imp->imp_replayable) {
144                         LCONSOLE_WARN("%s: Connection to service %.*s via nid "
145                                "%s was lost; in progress operations using this "
146                                "service will wait for recovery to complete.\n",
147                                imp->imp_obd->obd_name, target_len, target_start,
148                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
149                 } else {
150                         LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
151                                "%.*s via nid %s was lost; in progress "
152                                "operations using this service will fail.\n",
153                                imp->imp_obd->obd_name, target_len, target_start,
154                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
155                 }
156                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
157                 spin_unlock(&imp->imp_lock);
158
159                 if (obd_dump_on_timeout)
160                         libcfs_debug_dumplog();
161
162                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
163                 rc = 1;
164         } else {
165                 spin_unlock(&imp->imp_lock);
166                 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
167                        imp->imp_client->cli_name, imp,
168                        (imp->imp_state == LUSTRE_IMP_FULL &&
169                         imp->imp_conn_cnt > conn_cnt) ?
170                        "reconnected" : "not connected", imp->imp_conn_cnt,
171                        conn_cnt, ptlrpc_import_state_name(imp->imp_state));
172         }
173
174         return rc;
175 }
176
177 /* Must be called with imp_lock held! */
178 static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
179 {
180         ENTRY;
181         LASSERT_SPIN_LOCKED(&imp->imp_lock);
182
183         CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
184         imp->imp_invalid = 1;
185         imp->imp_generation++;
186         spin_unlock(&imp->imp_lock);
187
188         ptlrpc_abort_inflight(imp);
189         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
190 }
191
192 /*
193  * This acts as a barrier; all existing requests are rejected, and
194  * no new requests will be accepted until the import is valid again.
195  */
196 void ptlrpc_deactivate_import(struct obd_import *imp)
197 {
198         spin_lock(&imp->imp_lock);
199         ptlrpc_deactivate_and_unlock_import(imp);
200 }
201
202 static unsigned int 
203 ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
204 {
205         long dl;
206
207         if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
208               (req->rq_phase == RQ_PHASE_BULK) || 
209               (req->rq_phase == RQ_PHASE_NEW)))
210                 return 0;
211
212         if (req->rq_timedout)
213                 return 0;
214
215         if (req->rq_phase == RQ_PHASE_NEW)
216                 dl = req->rq_sent;
217         else
218                 dl = req->rq_deadline;
219
220         if (dl <= now)
221                 return 0;
222
223         return dl - now;
224 }
225
226 static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
227 {
228         time_t now = cfs_time_current_sec();
229         struct list_head *tmp, *n;
230         struct ptlrpc_request *req;
231         unsigned int timeout = 0;
232
233         spin_lock(&imp->imp_lock);
234         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
235                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
236                 timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
237         }
238         spin_unlock(&imp->imp_lock);
239         return timeout;
240 }
241
242 /*
243  * This function will invalidate the import, if necessary, then block
244  * for all the RPC completions, and finally notify the obd to
245  * invalidate its state (ie cancel locks, clear pending requests,
246  * etc).
247  */
248 void ptlrpc_invalidate_import(struct obd_import *imp)
249 {
250         struct list_head *tmp, *n;
251         struct ptlrpc_request *req;
252         struct l_wait_info lwi;
253         unsigned int timeout;
254         int rc;
255
256         atomic_inc(&imp->imp_inval_count);
257
258         /*
259          * If this is an invalid MGC connection, then don't bother
260          * waiting for imp_inflight to drop to 0.
261          */
262         if (imp->imp_invalid && imp->imp_recon_bk && !imp->imp_obd->obd_no_recov)
263                 goto out;
264
265         if (!imp->imp_invalid || imp->imp_obd->obd_no_recov)
266                 ptlrpc_deactivate_import(imp);
267
268         LASSERT(imp->imp_invalid);
269
270         /* Wait forever until inflight == 0. We really can't do it another
271          * way because in some cases we need to wait for very long reply 
272          * unlink. We can't do anything before that because there is really
273          * no guarantee that some rdma transfer is not in progress right now. */
274         do {
275                 /* Calculate max timeout for waiting on rpcs to error 
276                  * out. Use obd_timeout if calculated value is smaller
277                  * than it. */
278                 timeout = ptlrpc_inflight_timeout(imp);
279                 timeout += timeout / 3;
280                 
281                 if (timeout == 0)
282                         timeout = obd_timeout;
283                 
284                 CDEBUG(D_RPCTRACE, "Sleeping %d sec for inflight to error out\n",
285                        timeout);
286
287                 /* Wait for all requests to error out and call completion
288                  * callbacks. Cap it at obd_timeout -- these should all
289                  * have been locally cancelled by ptlrpc_abort_inflight. */
290                 lwi = LWI_TIMEOUT_INTERVAL(
291                         cfs_timeout_cap(cfs_time_seconds(timeout)),
292                         cfs_time_seconds(1), NULL, NULL);
293                 rc = l_wait_event(imp->imp_recovery_waitq,
294                                 (atomic_read(&imp->imp_inflight) == 0), &lwi);
295                 if (rc) {
296                         const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
297
298                         CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
299                                cli_tgt, rc, atomic_read(&imp->imp_inflight));
300
301                         spin_lock(&imp->imp_lock);
302                         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
303                                 req = list_entry(tmp, struct ptlrpc_request, 
304                                         rq_list);
305                                 DEBUG_REQ(D_ERROR, req, "still on sending list");
306                         }
307                         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
308                                 req = list_entry(tmp, struct ptlrpc_request, 
309                                         rq_list);
310                                 DEBUG_REQ(D_ERROR, req, "still on delayed list");
311                         }
312                         
313                         if (atomic_read(&imp->imp_unregistering) == 0) {
314                                 /* We know that only "unregistering" rpcs may
315                                  * still survive in sending or delaying lists
316                                  * (They are waiting for long reply unlink in
317                                  * sluggish nets). Let's check this. If there
318                                  * is no unregistering and inflight != 0 this
319                                  * is bug. */
320                                 LASSERT(atomic_read(&imp->imp_inflight) == 0);
321                                 
322                                 /* Let's save one loop as soon as inflight have
323                                  * dropped to zero. No new inflights possible at
324                                  * this point. */
325                                 rc = 0;
326                         } else {
327                                 CERROR("%s: RPCs in \"%s\" phase found (%d). "
328                                        "Network is sluggish? Waiting them "
329                                        "to error out.\n", cli_tgt,
330                                        ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
331                                        atomic_read(&imp->imp_unregistering));
332                         }
333                         spin_unlock(&imp->imp_lock);
334                 }
335         } while (rc != 0);
336
337         /* Let's additionally check that no new rpcs added to import in
338          * "invalidate" state. */
339         LASSERT(atomic_read(&imp->imp_inflight) == 0);
340
341 out:
342         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
343
344         atomic_dec(&imp->imp_inval_count);
345         cfs_waitq_signal(&imp->imp_recovery_waitq);
346 }
347
348 /* unset imp_invalid */
349 void ptlrpc_activate_import(struct obd_import *imp)
350 {
351         struct obd_device *obd = imp->imp_obd;
352
353         spin_lock(&imp->imp_lock);
354         imp->imp_invalid = 0;
355         spin_unlock(&imp->imp_lock);
356
357         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
358 }
359
360 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
361 {
362         ENTRY;
363
364         LASSERT(!imp->imp_dlm_fake);
365
366         if (ptlrpc_set_import_discon(imp, conn_cnt)) {
367                 if (!imp->imp_replayable) {
368                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
369                                "auto-deactivating\n",
370                                obd2cli_tgt(imp->imp_obd),
371                                imp->imp_connection->c_remote_uuid.uuid,
372                                imp->imp_obd->obd_name);
373                         ptlrpc_deactivate_import(imp);
374                 }
375
376                 CDEBUG(D_HA, "%s: waking up pinger\n",
377                        obd2cli_tgt(imp->imp_obd));
378
379                 spin_lock(&imp->imp_lock);
380                 imp->imp_force_verify = 1;
381                 spin_unlock(&imp->imp_lock);
382
383                 ptlrpc_pinger_wake_up();
384         }
385         EXIT;
386 }
387
388 int ptlrpc_reconnect_import(struct obd_import *imp)
389 {
390
391         ptlrpc_set_import_discon(imp, 0);
392         /* Force a new connect attempt */
393         ptlrpc_invalidate_import(imp);
394         /* Do a fresh connect next time by zeroing the handle */
395         ptlrpc_disconnect_import(imp, 1);
396         /* Wait for all invalidate calls to finish */
397         if (atomic_read(&imp->imp_inval_count) > 0) {
398                 int rc;
399                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
400                 rc = l_wait_event(imp->imp_recovery_waitq,
401                                   (atomic_read(&imp->imp_inval_count) == 0),
402                                   &lwi);
403                 if (rc)
404                         CERROR("Interrupted, inval=%d\n",
405                                atomic_read(&imp->imp_inval_count));
406         }
407
408         /*
409          * Allow reconnect attempts. Note: Currently, the function is
410          * only called by MGC. So assume this is a recoverable import,
411          * and force import to be recoverable. fix this if you need to
412          */
413
414         imp->imp_obd->obd_no_recov = 0;
415         /* Remove 'invalid' flag */
416         ptlrpc_activate_import(imp);
417         /* Attempt a new connect */
418         ptlrpc_recover_import(imp, NULL);
419         return 0;
420 }
421
422 EXPORT_SYMBOL(ptlrpc_reconnect_import);
423
424 static int import_select_connection(struct obd_import *imp)
425 {
426         struct obd_import_conn *imp_conn = NULL, *conn;
427         struct obd_export *dlmexp;
428         int tried_all = 1;
429         ENTRY;
430
431         spin_lock(&imp->imp_lock);
432
433         if (list_empty(&imp->imp_conn_list)) {
434                 CERROR("%s: no connections available\n",
435                         imp->imp_obd->obd_name);
436                 spin_unlock(&imp->imp_lock);
437                 RETURN(-EINVAL);
438         }
439
440         list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
441                 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
442                        imp->imp_obd->obd_name,
443                        libcfs_nid2str(conn->oic_conn->c_peer.nid),
444                        conn->oic_last_attempt);
445
446                 /* Don't thrash connections */
447                 if (cfs_time_before_64(cfs_time_current_64(),
448                                      conn->oic_last_attempt +
449                                      cfs_time_seconds(CONNECTION_SWITCH_MIN))) {
450                         continue;
451                 }
452
453                 /* If we have not tried this connection since the
454                    the last successful attempt, go with this one */
455                 if ((conn->oic_last_attempt == 0) ||
456                     cfs_time_beforeq_64(conn->oic_last_attempt,
457                                        imp->imp_last_success_conn)) {
458                         imp_conn = conn;
459                         tried_all = 0;
460                         break;
461                 }
462
463                 /* If all of the connections have already been tried
464                    since the last successful connection; just choose the
465                    least recently used */
466                 if (!imp_conn)
467                         imp_conn = conn;
468                 else if (cfs_time_before_64(conn->oic_last_attempt,
469                                             imp_conn->oic_last_attempt))
470                         imp_conn = conn;
471         }
472
473         /* if not found, simply choose the current one */
474         if (!imp_conn) {
475                 LASSERT(imp->imp_conn_current);
476                 imp_conn = imp->imp_conn_current;
477                 tried_all = 0;
478         }
479         LASSERT(imp_conn->oic_conn);
480
481         /* If we've tried everything, and we're back to the beginning of the
482            list, increase our timeout and try again. It will be reset when
483            we do finally connect. (FIXME: really we should wait for all network
484            state associated with the last connection attempt to drain before
485            trying to reconnect on it.) */
486         if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item) &&
487             !imp->imp_recon_bk /* not retrying */) {
488                 if (at_get(&imp->imp_at.iat_net_latency) <
489                     CONNECTION_SWITCH_MAX) {
490                         at_add(&imp->imp_at.iat_net_latency,
491                                at_get(&imp->imp_at.iat_net_latency) +
492                                CONNECTION_SWITCH_INC);
493                 }
494                 LASSERT(imp_conn->oic_last_attempt);
495                 CWARN("%s: tried all connections, increasing latency to %ds\n",
496                       imp->imp_obd->obd_name,
497                       at_get(&imp->imp_at.iat_net_latency));
498         }
499
500         imp_conn->oic_last_attempt = cfs_time_current_64();
501
502         /* switch connection, don't mind if it's same as the current one */
503         if (imp->imp_connection)
504                 ptlrpc_connection_put(imp->imp_connection);
505         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
506
507         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
508         LASSERT(dlmexp != NULL);
509         if (dlmexp->exp_connection)
510                 ptlrpc_connection_put(dlmexp->exp_connection);
511         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
512         class_export_put(dlmexp);
513
514         if (imp->imp_conn_current != imp_conn) {
515                 if (imp->imp_conn_current)
516                         LCONSOLE_INFO("Changing connection for %s to %s/%s\n",
517                                       imp->imp_obd->obd_name,
518                                       imp_conn->oic_uuid.uuid,
519                                       libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
520                 imp->imp_conn_current = imp_conn;
521         }
522
523         CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
524                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
525                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
526
527         spin_unlock(&imp->imp_lock);
528
529         RETURN(0);
530 }
531
532 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
533 {
534         struct obd_device *obd = imp->imp_obd;
535         int initial_connect = 0;
536         int rc;
537         __u64 committed_before_reconnect = 0;
538         struct ptlrpc_request *request;
539         __u32 size[] = { sizeof(struct ptlrpc_body),
540                        sizeof(imp->imp_obd->u.cli.cl_target_uuid),
541                        sizeof(obd->obd_uuid),
542                        sizeof(imp->imp_dlm_handle),
543                        sizeof(imp->imp_connect_data) };
544         char *tmp[] = { NULL,
545                         obd2cli_tgt(imp->imp_obd),
546                         obd->obd_uuid.uuid,
547                         (char *)&imp->imp_dlm_handle,
548                         (char *)&imp->imp_connect_data };
549         struct ptlrpc_connect_async_args *aa;
550
551         ENTRY;
552         spin_lock(&imp->imp_lock);
553         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
554                 spin_unlock(&imp->imp_lock);
555                 CERROR("can't connect to a closed import\n");
556                 RETURN(-EINVAL);
557         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
558                 spin_unlock(&imp->imp_lock);
559                 CERROR("already connected\n");
560                 RETURN(0);
561         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
562                 spin_unlock(&imp->imp_lock);
563                 CERROR("already connecting\n");
564                 RETURN(-EALREADY);
565         }
566
567         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
568
569         imp->imp_conn_cnt++;
570         imp->imp_resend_replay = 0;
571
572         if (!lustre_handle_is_used(&imp->imp_remote_handle))
573                 initial_connect = 1;
574         else
575                 committed_before_reconnect = imp->imp_peer_committed_transno;
576
577         spin_unlock(&imp->imp_lock);
578
579         if (new_uuid) {
580                 struct obd_uuid uuid;
581
582                 obd_str2uuid(&uuid, new_uuid);
583                 rc = import_set_conn_priority(imp, &uuid);
584                 if (rc)
585                         GOTO(out, rc);
586         }
587
588         rc = import_select_connection(imp);
589         if (rc)
590                 GOTO(out, rc);
591
592         /* last in connection list */
593         if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
594                 if (imp->imp_initial_recov_bk && initial_connect) {
595                         CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
596                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
597                         /* Don't retry if connect fails */
598                         rc = 0;
599                         obd_set_info_async(obd->obd_self_export,
600                                            sizeof(KEY_INIT_RECOV),
601                                            KEY_INIT_RECOV,
602                                            sizeof(rc), &rc, NULL);
603                 }
604                 if (imp->imp_recon_bk) {
605                         CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
606                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
607                         spin_lock(&imp->imp_lock);
608                         imp->imp_last_recon = 1;
609                         spin_unlock(&imp->imp_lock);
610                 }
611         }
612
613         /* Reset connect flags to the originally requested flags, in case
614          * the server is updated on-the-fly we will get the new features. */
615         imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
616         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
617
618         rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
619                            &obd->obd_uuid, &imp->imp_connect_data, NULL);
620         if (rc)
621                 GOTO(out, rc);
622
623         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
624                                   5, size, tmp);
625         if (!request)
626                 GOTO(out, rc = -ENOMEM);
627
628         /* Report the rpc service time to the server so that it knows how long
629          * to wait for clients to join recovery */
630         lustre_msg_set_service_time(request->rq_reqmsg,
631                                     at_timeout2est(request->rq_timeout));
632
633         /* The amount of time we give the server to process the connect req.
634          * import_select_connection will increase the net latency on
635          * repeated reconnect attempts to cover slow networks.
636          * We override/ignore the server rpc completion estimate here,
637          * which may be large if this is a reconnect attempt */
638         request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
639         lustre_msg_set_timeout(request->rq_reqmsg, request->rq_timeout);
640
641 #ifndef __KERNEL__
642         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
643 #endif
644         if (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V1)
645                 lustre_msg_add_op_flags(request->rq_reqmsg,
646                                         MSG_CONNECT_NEXT_VER);
647
648         request->rq_no_resend = request->rq_no_delay = 1;
649         request->rq_send_state = LUSTRE_IMP_CONNECTING;
650         /* Allow a slightly larger reply for future growth compatibility */
651         size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
652                               16 * sizeof(__u64);
653         ptlrpc_req_set_repsize(request, 2, size);
654         request->rq_interpret_reply = ptlrpc_connect_interpret;
655
656         CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
657         aa = ptlrpc_req_async_args(request);
658         memset(aa, 0, sizeof *aa);
659
660         aa->pcaa_peer_committed = committed_before_reconnect;
661         aa->pcaa_initial_connect = initial_connect;
662         if (aa->pcaa_initial_connect) {
663                 spin_lock(&imp->imp_lock);
664                 imp->imp_replayable = 1;
665                 spin_unlock(&imp->imp_lock);
666         }
667
668         DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
669                   aa->pcaa_initial_connect ? "initial " : "re",
670                   imp->imp_conn_cnt);
671         ptlrpcd_add_req(request);
672         rc = 0;
673 out:
674         if (rc != 0) {
675                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
676         }
677
678         RETURN(rc);
679 }
680 EXPORT_SYMBOL(ptlrpc_connect_import);
681
682 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
683 {
684 #ifdef __KERNEL__
685         struct obd_import_conn *imp_conn;
686 #endif
687         int wake_pinger = 0;
688
689         ENTRY;
690
691         spin_lock(&imp->imp_lock);
692         if (list_empty(&imp->imp_conn_list))
693                 GOTO(unlock, 0);
694
695 #ifdef __KERNEL__
696         imp_conn = list_entry(imp->imp_conn_list.prev,
697                               struct obd_import_conn,
698                               oic_item);
699
700         /* XXX: When the failover node is the primary node, it is possible
701          * to have two identical connections in imp_conn_list. We must
702          * compare not conn's pointers but NIDs, otherwise we can defeat
703          * connection throttling. (See bug 14774.) */
704         if (imp->imp_conn_current->oic_conn->c_peer.nid !=
705                                 imp_conn->oic_conn->c_peer.nid) {
706                 ptlrpc_ping_import_soon(imp);
707                 wake_pinger = 1;
708         }
709
710 #else
711         /* liblustre has no pinger thead, so we wakup pinger anyway */
712         wake_pinger = 1;
713 #endif
714  unlock:
715         spin_unlock(&imp->imp_lock);
716
717         if (wake_pinger)
718                 ptlrpc_pinger_wake_up();
719
720         EXIT;
721 }
722
723 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
724                                     void * data, int rc)
725 {
726         struct ptlrpc_connect_async_args *aa = data;
727         struct obd_import *imp = request->rq_import;
728         struct client_obd *cli = &imp->imp_obd->u.cli;
729         struct lustre_handle old_hdl;
730         __u64 old_connect_flags;
731         int msg_flags;
732         ENTRY;
733
734         spin_lock(&imp->imp_lock);
735         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
736                 spin_unlock(&imp->imp_lock);
737                 RETURN(0);
738         }
739         spin_unlock(&imp->imp_lock);
740
741         if (rc)
742                 GOTO(out, rc);
743
744         LASSERT(imp->imp_conn_current);
745
746         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
747
748         /* All imports are pingable */
749         spin_lock(&imp->imp_lock);
750         imp->imp_pingable = 1;
751
752         if (aa->pcaa_initial_connect) {
753                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
754                         imp->imp_replayable = 1;
755                         spin_unlock(&imp->imp_lock);
756                         CDEBUG(D_HA, "connected to replayable target: %s\n",
757                                obd2cli_tgt(imp->imp_obd));
758                 } else {
759                         imp->imp_replayable = 0;
760                         spin_unlock(&imp->imp_lock);
761                 }
762
763                 if ((request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V1 &&
764                      msg_flags & MSG_CONNECT_NEXT_VER) ||
765                     request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2) {
766                         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
767                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
768                                obd2cli_tgt(imp->imp_obd));
769                 } else {
770                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
771                                obd2cli_tgt(imp->imp_obd));
772                 }
773
774                 imp->imp_remote_handle =
775                                 *lustre_msg_get_handle(request->rq_repmsg);
776
777                 /* Initial connects are allowed for clients with non-random
778                  * uuids when servers are in recovery.  Simply signal the
779                  * servers replay is complete and wait in REPLAY_WAIT. */
780                 if (msg_flags & MSG_CONNECT_RECOVERING) {
781                         CDEBUG(D_HA, "connect to %s during recovery\n",
782                                obd2cli_tgt(imp->imp_obd));
783                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
784                 } else {
785                         IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
786                 }
787
788                 spin_lock(&imp->imp_lock);
789                 if (imp->imp_invalid) {
790                         spin_unlock(&imp->imp_lock);
791                         ptlrpc_activate_import(imp);
792                 } else {
793                         spin_unlock(&imp->imp_lock);
794                 }
795                 GOTO(finish, rc = 0);
796         } else {
797                 spin_unlock(&imp->imp_lock);
798         }
799
800         /* Determine what recovery state to move the import to. */
801         if (MSG_CONNECT_RECONNECT & msg_flags) {
802                 memset(&old_hdl, 0, sizeof(old_hdl));
803                 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
804                             sizeof (old_hdl))) {
805                         CERROR("%s@%s didn't like our handle "LPX64
806                                ", failed\n", obd2cli_tgt(imp->imp_obd),
807                                imp->imp_connection->c_remote_uuid.uuid,
808                                imp->imp_dlm_handle.cookie);
809                         GOTO(out, rc = -ENOTCONN);
810                 }
811
812                 if (memcmp(&imp->imp_remote_handle,
813                            lustre_msg_get_handle(request->rq_repmsg),
814                            sizeof(imp->imp_remote_handle))) {
815                         int level = msg_flags & MSG_CONNECT_RECOVERING ? D_HA :
816                                                                          D_WARNING;
817
818                         /* Bug 16611/14775: if server handle have changed,
819                          * that means some sort of disconnection happened.
820                          * If the server is not in recovery, that also means it
821                          * already erased all of our state because of previous
822                          * eviction. If it is in recovery - we are safe to
823                          * participate since we can reestablish all of our state
824                          * with server again */
825                         CDEBUG(level,"%s@%s changed server handle from "
826                                      LPX64" to "LPX64"%s \n" "but is still in recovery \n",
827                                      obd2cli_tgt(imp->imp_obd),
828                                      imp->imp_connection->c_remote_uuid.uuid,
829                                      imp->imp_remote_handle.cookie,
830                                      lustre_msg_get_handle(request->rq_repmsg)->
831                                                                         cookie,
832                                      (MSG_CONNECT_RECOVERING & msg_flags) ?
833                                          "but is still in recovery" : "");
834
835                         imp->imp_remote_handle =
836                                      *lustre_msg_get_handle(request->rq_repmsg);
837
838                         if (!(MSG_CONNECT_RECOVERING & msg_flags)) {
839                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
840                                 GOTO(finish, rc = 0);
841                         }
842
843                 } else {
844                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
845                                obd2cli_tgt(imp->imp_obd),
846                                imp->imp_connection->c_remote_uuid.uuid);
847                 }
848
849                 if (imp->imp_invalid) {
850                         CDEBUG(D_HA, "%s: reconnected but import is invalid; "
851                                "marking evicted\n", imp->imp_obd->obd_name);
852                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
853                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
854                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
855                                imp->imp_obd->obd_name,
856                                obd2cli_tgt(imp->imp_obd));
857
858                         spin_lock(&imp->imp_lock);
859                         imp->imp_resend_replay = 1;
860                         /* VBR: delayed connection */
861                         if (MSG_CONNECT_DELAYED & msg_flags) {
862                                 imp->imp_delayed_recovery = 1;
863                                 imp->imp_no_lock_replay = 1;
864                         }
865                         spin_unlock(&imp->imp_lock);
866
867                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
868                 } else {
869                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
870                 }
871         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
872                 LASSERT(imp->imp_replayable);
873                 imp->imp_remote_handle =
874                                 *lustre_msg_get_handle(request->rq_repmsg);
875                 imp->imp_last_replay_transno = 0;
876                 /* VBR: delayed connection */
877                 if (MSG_CONNECT_DELAYED & msg_flags) {
878                         spin_lock(&imp->imp_lock);
879                         imp->imp_delayed_recovery = 1;
880                         imp->imp_no_lock_replay = 1;
881                         spin_unlock(&imp->imp_lock);
882                 }
883                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
884         } else {
885                 DEBUG_REQ(D_HA, request, "evicting (not initial connect and "
886                           "flags reconnect/recovering not set: %x)",msg_flags);
887                 imp->imp_remote_handle =
888                                 *lustre_msg_get_handle(request->rq_repmsg);
889                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
890         }
891
892         /* Sanity checks for a reconnected import. */
893         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
894                 CERROR("imp_replayable flag does not match server "
895                        "after reconnect. We should LBUG right here.\n");
896         }
897
898         if (lustre_msg_get_last_committed(request->rq_repmsg) <
899             aa->pcaa_peer_committed) {
900                 CERROR("%s went back in time (transno "LPD64
901                        " was previously committed, server now claims "LPD64
902                        ")!  See https://bugzilla.lustre.org/show_bug.cgi?"
903                        "id=9646\n",
904                        obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
905                        lustre_msg_get_last_committed(request->rq_repmsg));
906         }
907
908 finish:
909         rc = ptlrpc_import_recovery_state_machine(imp);
910         if (rc != 0) {
911                 if (rc == -ENOTCONN) {
912                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
913                                "invalidating and reconnecting\n",
914                                obd2cli_tgt(imp->imp_obd),
915                                imp->imp_connection->c_remote_uuid.uuid);
916                         ptlrpc_connect_import(imp, NULL);
917                         RETURN(0);
918                 }
919         } else {
920                 struct obd_connect_data *ocd;
921                 struct obd_export *exp;
922
923                 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
924                                          lustre_swab_connect);
925                 spin_lock(&imp->imp_lock);
926                 list_del(&imp->imp_conn_current->oic_item);
927                 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
928                 imp->imp_last_success_conn =
929                         imp->imp_conn_current->oic_last_attempt;
930
931                 if (ocd == NULL) {
932                         spin_unlock(&imp->imp_lock);
933                         CERROR("Wrong connect data from server\n");
934                         rc = -EPROTO;
935                         GOTO(out, rc);
936                 }
937
938                 imp->imp_connect_data = *ocd;
939
940                 exp = class_conn2export(&imp->imp_dlm_handle);
941                 spin_unlock(&imp->imp_lock);
942
943                 /* check that server granted subset of flags we asked for. */
944                 LASSERTF((ocd->ocd_connect_flags &
945                           imp->imp_connect_flags_orig) ==
946                          ocd->ocd_connect_flags, LPX64" != "LPX64,
947                          imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
948
949                 if (!exp) {
950                         /* This could happen if export is cleaned during the
951                            connect attempt */
952                         CERROR("Missing export for %s\n",
953                                imp->imp_obd->obd_name);
954                         GOTO(out, rc = -ENODEV);
955                 }
956                 old_connect_flags = exp->exp_connect_flags;
957                 exp->exp_connect_flags = ocd->ocd_connect_flags;
958                 imp->imp_obd->obd_self_export->exp_connect_flags = ocd->ocd_connect_flags;
959                 class_export_put(exp);
960
961                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
962
963                 if (!ocd->ocd_ibits_known &&
964                     ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
965                         CERROR("Inodebits aware server returned zero compatible"
966                                " bits?\n");
967
968                 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
969                     (ocd->ocd_version > LUSTRE_VERSION_CODE +
970                                         LUSTRE_VERSION_OFFSET_WARN ||
971                      ocd->ocd_version < LUSTRE_VERSION_CODE -
972                                         LUSTRE_VERSION_OFFSET_WARN)) {
973                         /* Sigh, some compilers do not like #ifdef in the middle
974                            of macro arguments */
975 #ifdef __KERNEL__
976                         const char *older =
977                                 "older.  Consider upgrading this client";
978 #else
979                         const char *older =
980                                 "older.  Consider recompiling this application";
981 #endif
982                         const char *newer = "newer than client version";
983
984                         LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
985                                       "is much %s (%s)\n",
986                                       obd2cli_tgt(imp->imp_obd),
987                                       OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
988                                       OBD_OCD_VERSION_MINOR(ocd->ocd_version),
989                                       OBD_OCD_VERSION_PATCH(ocd->ocd_version),
990                                       OBD_OCD_VERSION_FIX(ocd->ocd_version),
991                                       ocd->ocd_version > LUSTRE_VERSION_CODE ?
992                                       newer : older, LUSTRE_VERSION_STRING);
993                 }
994
995                 if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
996                         /* We sent to the server ocd_cksum_types with bits set
997                          * for algorithms we understand. The server masked off
998                          * the checksum types it doesn't support */
999                         if ((ocd->ocd_cksum_types & OBD_CKSUM_ALL) == 0) {
1000                                 LCONSOLE_WARN("The negotiation of the checksum "
1001                                               "alogrithm to use with server %s "
1002                                               "failed (%x/%x), disabling "
1003                                               "checksums\n",
1004                                               obd2cli_tgt(imp->imp_obd),
1005                                               ocd->ocd_cksum_types,
1006                                               OBD_CKSUM_ALL);
1007                                 cli->cl_checksum = 0;
1008                                 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1009                                 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1010                         } else {
1011                                 cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
1012
1013                                 if (ocd->ocd_cksum_types & OSC_DEFAULT_CKSUM)
1014                                         cli->cl_cksum_type = OSC_DEFAULT_CKSUM;
1015                                 else if (ocd->ocd_cksum_types & OBD_CKSUM_ADLER)
1016                                         cli->cl_cksum_type = OBD_CKSUM_ADLER;
1017                                 else
1018                                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
1019                         }
1020                 } else {
1021                         /* The server does not support OBD_CONNECT_CKSUM.
1022                          * Enforce CRC32 for backward compatibility*/
1023                         cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1024                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
1025                 }
1026
1027                 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
1028                         cli->cl_max_pages_per_rpc =
1029                                 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
1030                 }
1031
1032                 /* Reset ns_connect_flags only for initial connect. It might be
1033                  * changed in while using FS and if we reset it in reconnect
1034                  * this leads to lossing user settings done before such as
1035                  * disable lru_resize, etc. */
1036                 if (old_connect_flags != exp->exp_connect_flags ||
1037                     aa->pcaa_initial_connect) {
1038                         CWARN("Reseting ns_connect_flags to server flags: "LPU64"\n", 
1039                               ocd->ocd_connect_flags);
1040                         imp->imp_obd->obd_namespace->ns_connect_flags =
1041                                 ocd->ocd_connect_flags;
1042                         imp->imp_obd->obd_namespace->ns_orig_connect_flags =
1043                                 ocd->ocd_connect_flags;
1044                 }
1045
1046                 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
1047                     (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
1048                         /* We need a per-message support flag, because
1049                            a. we don't know if the incoming connect reply
1050                               supports AT or not (in reply_in_callback)
1051                               until we unpack it.
1052                            b. failovered server means export and flags are gone
1053                               (in ptlrpc_send_reply).
1054                            Can only be set when we know AT is supported at
1055                            both ends */
1056                         imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
1057                 else
1058                         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
1059
1060                 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
1061                         (cli->cl_max_pages_per_rpc > 0));
1062         }
1063
1064  out:
1065         if (rc != 0) {
1066                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
1067                 spin_lock(&imp->imp_lock);
1068                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov &&
1069                     (request->rq_import_generation == imp->imp_generation))
1070                         ptlrpc_deactivate_and_unlock_import(imp);
1071                 else
1072                         spin_unlock(&imp->imp_lock);
1073
1074                 if (imp->imp_recon_bk && imp->imp_last_recon) {
1075                         /* Give up trying to reconnect */
1076                         imp->imp_obd->obd_no_recov = 1;
1077                         ptlrpc_deactivate_import(imp);
1078                 }
1079
1080                 if (rc == -EPROTO) {
1081                         struct obd_connect_data *ocd;
1082                         ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
1083                                                  sizeof *ocd,
1084                                                  lustre_swab_connect);
1085                         if (ocd &&
1086                             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1087                             (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
1088                            /* Actually servers are only supposed to refuse
1089                               connection from liblustre clients, so we should
1090                               never see this from VFS context */
1091                                 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
1092                                         "(%d.%d.%d.%d)"
1093                                         " refused connection from this client "
1094                                         "with an incompatible version (%s).  "
1095                                         "Client must be recompiled\n",
1096                                         obd2cli_tgt(imp->imp_obd),
1097                                         OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1098                                         OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1099                                         OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1100                                         OBD_OCD_VERSION_FIX(ocd->ocd_version),
1101                                         LUSTRE_VERSION_STRING);
1102                                 ptlrpc_deactivate_import(imp);
1103                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
1104                         }
1105                         RETURN(-EPROTO);
1106                 }
1107
1108                 ptlrpc_maybe_ping_import_soon(imp);
1109
1110                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
1111                        obd2cli_tgt(imp->imp_obd),
1112                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
1113         }
1114
1115         spin_lock(&imp->imp_lock);
1116         imp->imp_last_recon = 0;
1117         spin_unlock(&imp->imp_lock);
1118
1119         cfs_waitq_signal(&imp->imp_recovery_waitq);
1120         RETURN(rc);
1121 }
1122
1123 static int completed_replay_interpret(struct ptlrpc_request *req,
1124                                       void * data, int rc)
1125 {
1126         ENTRY;
1127         atomic_dec(&req->rq_import->imp_replay_inflight);
1128         if (req->rq_status == 0 &&
1129             !req->rq_import->imp_vbr_failed) {
1130                 ptlrpc_import_recovery_state_machine(req->rq_import);
1131         } else {
1132                 if (req->rq_import->imp_vbr_failed) {
1133                         CDEBUG(D_WARNING,
1134                                "%s: version recovery fails, reconnecting\n",
1135                                req->rq_import->imp_obd->obd_name);
1136                         spin_lock(&req->rq_import->imp_lock);
1137                         req->rq_import->imp_vbr_failed = 0;
1138                         spin_unlock(&req->rq_import->imp_lock);
1139                 } else {
1140                         CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
1141                                      "reconnecting\n",
1142                                req->rq_import->imp_obd->obd_name,
1143                                req->rq_status);
1144                 }
1145                 ptlrpc_connect_import(req->rq_import, NULL);
1146         }
1147         RETURN(0);
1148 }
1149
1150 static int signal_completed_replay(struct obd_import *imp)
1151 {
1152         struct ptlrpc_request *req;
1153         ENTRY;
1154
1155         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
1156         atomic_inc(&imp->imp_replay_inflight);
1157
1158         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
1159         if (!req) {
1160                 atomic_dec(&imp->imp_replay_inflight);
1161                 RETURN(-ENOMEM);
1162         }
1163
1164         ptlrpc_req_set_repsize(req, 1, NULL);
1165         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
1166         lustre_msg_add_flags(req->rq_reqmsg, MSG_LAST_REPLAY);
1167         if (imp->imp_delayed_recovery)
1168                 lustre_msg_add_flags(req->rq_reqmsg, MSG_DELAY_REPLAY);
1169         req->rq_timeout *= 3;
1170         req->rq_interpret_reply = completed_replay_interpret;
1171
1172         ptlrpcd_add_req(req);
1173         RETURN(0);
1174 }
1175
1176 #ifdef __KERNEL__
1177 static int ptlrpc_invalidate_import_thread(void *data)
1178 {
1179         struct obd_import *imp = data;
1180         int disconnect;
1181
1182         ENTRY;
1183
1184         ptlrpc_daemonize("ll_imp_inval");
1185
1186         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
1187                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
1188                imp->imp_connection->c_remote_uuid.uuid);
1189
1190         ptlrpc_invalidate_import(imp);
1191
1192         /* is client_disconnect_export in flight ? */
1193         spin_lock(&imp->imp_lock);
1194         disconnect = imp->imp_deactive;
1195         spin_unlock(&imp->imp_lock);
1196         if (disconnect)
1197                 GOTO(out, 0 );
1198
1199         if (obd_dump_on_eviction) {
1200                 CERROR("dump the log upon eviction\n");
1201                 libcfs_debug_dumplog();
1202         }
1203
1204         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1205         ptlrpc_import_recovery_state_machine(imp);
1206
1207 out:
1208         class_import_put(imp);
1209         RETURN(0);
1210 }
1211 #endif
1212
1213 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
1214 {
1215         int rc = 0;
1216         int inflight;
1217         char *target_start;
1218         int target_len;
1219
1220         ENTRY;
1221         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
1222                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1223                           &target_start, &target_len);
1224                 /* Don't care about MGC eviction */
1225                 if (strcmp(imp->imp_obd->obd_type->typ_name,
1226                            LUSTRE_MGC_NAME) != 0) {
1227                         LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
1228                                            "%.*s; in progress operations using "
1229                                            "this service will fail.\n",
1230                                            target_len, target_start);
1231                 }
1232                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
1233                        obd2cli_tgt(imp->imp_obd),
1234                        imp->imp_connection->c_remote_uuid.uuid);
1235
1236 #ifdef __KERNEL__
1237                 /* bug 17802:  XXX client_disconnect_export vs connect request
1238                  * race. if client will evicted at this time, we start invalidate
1239                  * thread without referece to import and import can be freed
1240                  * at same time. */
1241                 class_import_get(imp);
1242                 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
1243                                    CLONE_VM | CLONE_FILES);
1244                 if (rc < 0) {
1245                         class_import_put(imp);
1246                         CERROR("error starting invalidate thread: %d\n", rc);
1247                 } else {
1248                         rc = 0;
1249                 }
1250                 RETURN(rc);
1251 #else
1252                 ptlrpc_invalidate_import(imp);
1253
1254                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1255 #endif
1256         }
1257
1258         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1259                 CDEBUG(D_HA, "replay requested by %s\n",
1260                        obd2cli_tgt(imp->imp_obd));
1261                 rc = ptlrpc_replay_next(imp, &inflight);
1262                 if (inflight == 0 &&
1263                     atomic_read(&imp->imp_replay_inflight) == 0) {
1264                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
1265                         rc = ldlm_replay_locks(imp);
1266                         if (rc)
1267                                 GOTO(out, rc);
1268                 }
1269                 rc = 0;
1270         }
1271
1272         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1273                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1274                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
1275                         rc = signal_completed_replay(imp);
1276                         if (rc)
1277                                 GOTO(out, rc);
1278                 }
1279
1280         }
1281
1282         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1283                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1284                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1285                 }
1286         }
1287
1288         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1289                 CDEBUG(D_HA, "reconnected to %s@%s\n",
1290                        obd2cli_tgt(imp->imp_obd),
1291                        imp->imp_connection->c_remote_uuid.uuid);
1292
1293                 rc = ptlrpc_resend(imp);
1294                 if (rc)
1295                         GOTO(out, rc);
1296                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1297                 ptlrpc_activate_import(imp);
1298
1299                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1300                           &target_start, &target_len);
1301                 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1302                               "using nid %s.\n", imp->imp_obd->obd_name,
1303                               target_len, target_start,
1304                               libcfs_nid2str(imp->imp_connection->c_peer.nid));
1305         }
1306
1307         if (imp->imp_state == LUSTRE_IMP_FULL) {
1308                 cfs_waitq_signal(&imp->imp_recovery_waitq);
1309                 ptlrpc_wake_delayed(imp);
1310         }
1311
1312  out:
1313         RETURN(rc);
1314 }
1315
1316 static int back_to_sleep(void *unused)
1317 {
1318         return 0;
1319 }
1320
1321 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1322 {
1323         struct ptlrpc_request *req;
1324         int rq_opc, rc = 0;
1325         int nowait = imp->imp_obd->obd_force;
1326         ENTRY;
1327
1328         if (nowait)
1329                 GOTO(set_state, rc);
1330
1331         switch (imp->imp_connect_op) {
1332         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1333         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1334         case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1335         default:
1336                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1337                        obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1338                 RETURN(-EINVAL);
1339         }
1340
1341         if (ptlrpc_import_in_recovery(imp)) {
1342                 struct l_wait_info lwi;
1343                 cfs_duration_t timeout;
1344
1345                 if (AT_OFF) {
1346                         timeout = cfs_time_seconds(obd_timeout);
1347                 } else {
1348                         int idx = import_at_get_index(imp,
1349                                 imp->imp_client->cli_request_portal);
1350                         timeout = cfs_time_seconds(
1351                                 at_get(&imp->imp_at.iat_service_estimate[idx]));
1352                 }
1353                 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout),
1354                                        back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1355                 rc = l_wait_event(imp->imp_recovery_waitq,
1356                                   !ptlrpc_import_in_recovery(imp), &lwi);
1357         }
1358
1359         spin_lock(&imp->imp_lock);
1360         if (imp->imp_state != LUSTRE_IMP_FULL)
1361                 GOTO(out, 0);
1362
1363         spin_unlock(&imp->imp_lock);
1364
1365         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1366         if (req) {
1367                 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1368                  * it fails.  We can get through the above with a down server
1369                  * if the client doesn't know the server is gone yet. */
1370                 req->rq_no_resend = 1;
1371
1372 #ifndef CRAY_XT3
1373                 /* We want client umounts to happen quickly, no matter the
1374                    server state... */
1375                 req->rq_timeout = min_t(int, req->rq_timeout,
1376                                         INITIAL_CONNECT_TIMEOUT);
1377 #else
1378                 /* ... but we always want liblustre clients to nicely
1379                    disconnect, so only use the adaptive value. */
1380                 if (AT_OFF)
1381                         req->rq_timeout = obd_timeout / 3;
1382 #endif
1383
1384                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1385                 req->rq_send_state =  LUSTRE_IMP_CONNECTING;
1386                 ptlrpc_req_set_repsize(req, 1, NULL);
1387                 rc = ptlrpc_queue_wait(req);
1388                 ptlrpc_req_finished(req);
1389         }
1390
1391 set_state:
1392         spin_lock(&imp->imp_lock);
1393 out:
1394         if (noclose)
1395                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1396         else
1397                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1398         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1399         /* Try all connections in the future - bz 12758 */
1400         imp->imp_last_recon = 0;
1401         spin_unlock(&imp->imp_lock);
1402
1403         RETURN(rc);
1404 }
1405
1406 /* Sets maximal number of RPCs possible originating from other side of this
1407    import (server) to us and number of async RPC replies that we are not waiting
1408    for arriving */
1409 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1410 {
1411         LNetSetAsync(imp->imp_connection->c_peer, count);
1412 }
1413
1414
1415 /* Adaptive Timeout utils */
1416 extern unsigned int at_min, at_max, at_history;
1417
1418 /* Bin into timeslices using AT_BINS bins.
1419    This gives us a max of the last binlimit*AT_BINS secs without the storage,
1420    but still smoothing out a return to normalcy from a slow response.
1421    (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1422 int at_add(struct adaptive_timeout *at, unsigned int val)
1423 {
1424         unsigned int old = at->at_current;
1425         time_t now = cfs_time_current_sec();
1426         time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
1427
1428         LASSERT(at);
1429 #if 0
1430         CDEBUG(D_INFO, "add %u to %p time=%lu v=%u (%u %u %u %u)\n",
1431                val, at, now - at->at_binstart, at->at_current,
1432                at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1433 #endif
1434         if (val == 0)
1435                 /* 0's don't count, because we never want our timeout to
1436                    drop to 0, and because 0 could mean an error */
1437                 return 0;
1438
1439         spin_lock(&at->at_lock);
1440
1441         if (unlikely(at->at_binstart == 0)) {
1442                 /* Special case to remove default from history */
1443                 at->at_current = val;
1444                 at->at_worst_ever = val;
1445                 at->at_worst_time = now;
1446                 at->at_hist[0] = val;
1447                 at->at_binstart = now;
1448         } else if (now - at->at_binstart < binlimit ) {
1449                 /* in bin 0 */
1450                 at->at_hist[0] = max(val, at->at_hist[0]);
1451                 at->at_current = max(val, at->at_current);
1452         } else {
1453                 int i, shift;
1454                 unsigned int maxv = val;
1455                 /* move bins over */
1456                 shift = (now - at->at_binstart) / binlimit;
1457                 LASSERT(shift > 0);
1458                 for(i = AT_BINS - 1; i >= 0; i--) {
1459                         if (i >= shift) {
1460                                 at->at_hist[i] = at->at_hist[i - shift];
1461                                 maxv = max(maxv, at->at_hist[i]);
1462                         } else {
1463                                 at->at_hist[i] = 0;
1464                         }
1465                 }
1466                 at->at_hist[0] = val;
1467                 at->at_current = maxv;
1468                 at->at_binstart += shift * binlimit;
1469         }
1470
1471         if (at->at_current > at->at_worst_ever) {
1472                 at->at_worst_ever = at->at_current;
1473                 at->at_worst_time = now;
1474         }
1475
1476         if (at->at_flags & AT_FLG_NOHIST)
1477                 /* Only keep last reported val; keeping the rest of the history
1478                    for proc only */
1479                 at->at_current = val;
1480
1481         if (at_max > 0)
1482                 at->at_current =  min(at->at_current, at_max);
1483         at->at_current =  max(at->at_current, at_min);
1484
1485 #if 0
1486         if (at->at_current != old)
1487                 CDEBUG(D_ADAPTTO, "AT %p change: old=%u new=%u delta=%d "
1488                        "(val=%u) hist %u %u %u %u\n", at,
1489                        old, at->at_current, at->at_current - old, val,
1490                        at->at_hist[0], at->at_hist[1], at->at_hist[2],
1491                        at->at_hist[3]);
1492 #endif
1493
1494         /* if we changed, report the old value */
1495         old = (at->at_current != old) ? old : 0;
1496
1497         spin_unlock(&at->at_lock);
1498         return old;
1499 }
1500
1501 /* Find the imp_at index for a given portal; assign if space available */
1502 int import_at_get_index(struct obd_import *imp, int portal)
1503 {
1504         struct imp_at *at = &imp->imp_at;
1505         int i;
1506
1507         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1508                 if (at->iat_portal[i] == portal)
1509                         return i;
1510                 if (at->iat_portal[i] == 0)
1511                         /* unused */
1512                         break;
1513         }
1514
1515         /* Not found in list, add it under a lock */
1516         spin_lock(&imp->imp_lock);
1517
1518         /* Check unused under lock */
1519         for (; i < IMP_AT_MAX_PORTALS; i++) {
1520                 if (at->iat_portal[i] == portal)
1521                         goto out;
1522                 if (at->iat_portal[i] == 0)
1523                         /* unused */
1524                         break;
1525         }
1526
1527         /* Not enough portals? */
1528         LASSERT(i < IMP_AT_MAX_PORTALS);
1529
1530         at->iat_portal[i] = portal;
1531 out:
1532         spin_unlock(&imp->imp_lock);
1533         return i;
1534 }