Whamcloud - gitweb
b=13537
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Mike Shaver <shaver@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  */
25
26 #define DEBUG_SUBSYSTEM S_RPC
27 #ifndef __KERNEL__
28 # include <liblustre.h>
29 #endif
30
31 #include <obd_support.h>
32 #include <lustre_ha.h>
33 #include <lustre_net.h>
34 #include <lustre_import.h>
35 #include <lustre_export.h>
36 #include <obd.h>
37 #include <obd_class.h>
38
39 #include "ptlrpc_internal.h"
40
41 struct ptlrpc_connect_async_args {
42          __u64 pcaa_peer_committed;
43         int pcaa_initial_connect;
44 };
45
46 /* A CLOSED import should remain so. */
47 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
48 do {                                                                           \
49         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
50                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
51                       imp, obd2cli_tgt(imp->imp_obd),                          \
52                       ptlrpc_import_state_name(imp->imp_state),                \
53                       ptlrpc_import_state_name(state));                        \
54                imp->imp_state = state;                                         \
55         }                                                                      \
56 } while(0)
57
58 #define IMPORT_SET_STATE(imp, state)            \
59 do {                                            \
60         spin_lock(&imp->imp_lock);              \
61         IMPORT_SET_STATE_NOLOCK(imp, state);    \
62         spin_unlock(&imp->imp_lock);            \
63 } while(0)
64
65
66 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
67                                     void * data, int rc);
68 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
69
70 /* Only this function is allowed to change the import state when it is
71  * CLOSED. I would rather refcount the import and free it after
72  * disconnection like we do with exports. To do that, the client_obd
73  * will need to save the peer info somewhere other than in the import,
74  * though. */
75 int ptlrpc_init_import(struct obd_import *imp)
76 {
77         spin_lock(&imp->imp_lock);
78
79         imp->imp_generation++;
80         imp->imp_state =  LUSTRE_IMP_NEW;
81
82         spin_unlock(&imp->imp_lock);
83
84         return 0;
85 }
86 EXPORT_SYMBOL(ptlrpc_init_import);
87
88 #define UUID_STR "_UUID"
89 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
90                       int *uuid_len)
91 {
92         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
93                 ? uuid : uuid + strlen(prefix);
94
95         *uuid_len = strlen(*uuid_start);
96
97         if (*uuid_len < strlen(UUID_STR))
98                 return;
99
100         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
101                     UUID_STR, strlen(UUID_STR)))
102                 *uuid_len -= strlen(UUID_STR);
103 }
104
105 /* Returns true if import was FULL, false if import was already not
106  * connected.
107  * @imp - import to be disconnected
108  * @conn_cnt - connection count (epoch) of the request that timed out
109  *             and caused the disconnection.  In some cases, multiple
110  *             inflight requests can fail to a single target (e.g. OST
111  *             bulk requests) and if one has already caused a reconnection
112  *             (increasing the import->conn_cnt) the older failure should
113  *             not also cause a reconnection.  If zero it forces a reconnect.
114  */
115 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
116 {
117         int rc = 0;
118
119         spin_lock(&imp->imp_lock);
120
121         if (imp->imp_state == LUSTRE_IMP_FULL &&
122             (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
123                 char *target_start;
124                 int   target_len;
125
126                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
127                           &target_start, &target_len);
128                 if (imp->imp_replayable) {
129                         LCONSOLE_WARN("%s: Connection to service %.*s via nid "
130                                "%s was lost; in progress operations using this "
131                                "service will wait for recovery to complete.\n",
132                                imp->imp_obd->obd_name, target_len, target_start,
133                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
134                 } else {
135                         LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
136                                "%.*s via nid %s was lost; in progress "
137                                "operations using this service will fail.\n",
138                                imp->imp_obd->obd_name, target_len, target_start, 
139                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
140                 }
141                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
142                 spin_unlock(&imp->imp_lock);
143     
144                 if (obd_dump_on_timeout)
145                         libcfs_debug_dumplog();
146
147                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
148                 rc = 1;
149         } else {
150                 spin_unlock(&imp->imp_lock);
151                 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
152                        imp->imp_client->cli_name, imp,
153                        (imp->imp_state == LUSTRE_IMP_FULL &&
154                         imp->imp_conn_cnt > conn_cnt) ?
155                        "reconnected" : "not connected", imp->imp_conn_cnt,
156                        conn_cnt, ptlrpc_import_state_name(imp->imp_state));
157         }
158
159         return rc;
160 }
161
162 /*
163  * This acts as a barrier; all existing requests are rejected, and
164  * no new requests will be accepted until the import is valid again.
165  */
166 void ptlrpc_deactivate_import(struct obd_import *imp)
167 {
168         ENTRY;
169
170         spin_lock(&imp->imp_lock);
171         CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
172         imp->imp_invalid = 1;
173         imp->imp_generation++;
174         spin_unlock(&imp->imp_lock);
175
176         ptlrpc_abort_inflight(imp);
177         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
178 }
179
180 /*
181  * This function will invalidate the import, if necessary, then block
182  * for all the RPC completions, and finally notify the obd to
183  * invalidate its state (ie cancel locks, clear pending requests,
184  * etc).
185  */
186 void ptlrpc_invalidate_import(struct obd_import *imp)
187 {
188         struct list_head *tmp, *n;
189         struct ptlrpc_request *req;
190         struct l_wait_info lwi;
191         int rc;
192
193         atomic_inc(&imp->imp_inval_count);
194
195         if (!imp->imp_invalid)
196                 ptlrpc_deactivate_import(imp);
197
198         LASSERT(imp->imp_invalid);
199
200         /* wait for all requests to error out and call completion callbacks.
201            Cap it at obd_timeout -- these should all have been locally
202            cancelled by ptlrpc_abort_inflight. */
203         lwi = LWI_TIMEOUT_INTERVAL(
204                 cfs_timeout_cap(cfs_time_seconds(obd_timeout)),
205                 cfs_time_seconds(1), NULL, NULL);
206         rc = l_wait_event(imp->imp_recovery_waitq,
207                           (atomic_read(&imp->imp_inflight) == 0), &lwi);
208
209         if (rc) {
210                 CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
211                        obd2cli_tgt(imp->imp_obd), rc,
212                        atomic_read(&imp->imp_inflight));
213                 spin_lock(&imp->imp_lock);
214                 list_for_each_safe(tmp, n, &imp->imp_sending_list) {
215                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
216                         DEBUG_REQ(D_ERROR, req, "still on sending list");
217                 }
218                 list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
219                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
220                         DEBUG_REQ(D_ERROR, req, "still on delayed list");
221                 }
222                 spin_unlock(&imp->imp_lock);
223         }
224
225         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
226
227         atomic_dec(&imp->imp_inval_count);
228         cfs_waitq_signal(&imp->imp_recovery_waitq);
229 }
230
231 /* unset imp_invalid */
232 void ptlrpc_activate_import(struct obd_import *imp)
233 {
234         struct obd_device *obd = imp->imp_obd;
235
236         spin_lock(&imp->imp_lock);
237         imp->imp_invalid = 0;
238         spin_unlock(&imp->imp_lock);
239
240         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
241 }
242
243 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
244 {
245         ENTRY;
246
247         LASSERT(!imp->imp_dlm_fake);
248
249         if (ptlrpc_set_import_discon(imp, conn_cnt)) {
250                 if (!imp->imp_replayable) {
251                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
252                                "auto-deactivating\n",
253                                obd2cli_tgt(imp->imp_obd),
254                                imp->imp_connection->c_remote_uuid.uuid,
255                                imp->imp_obd->obd_name);
256                         ptlrpc_deactivate_import(imp);
257                 }
258
259                 CDEBUG(D_HA, "%s: waking up pinger\n",
260                        obd2cli_tgt(imp->imp_obd));
261
262                 spin_lock(&imp->imp_lock);
263                 imp->imp_force_verify = 1;
264                 spin_unlock(&imp->imp_lock);
265
266                 ptlrpc_pinger_wake_up();
267         }
268         EXIT;
269 }
270
271 static int import_select_connection(struct obd_import *imp)
272 {
273         struct obd_import_conn *imp_conn = NULL, *conn;
274         struct obd_export *dlmexp;
275         int tried_all = 1;
276         ENTRY;
277
278         spin_lock(&imp->imp_lock);
279
280         if (list_empty(&imp->imp_conn_list)) {
281                 CERROR("%s: no connections available\n",
282                         imp->imp_obd->obd_name);
283                 spin_unlock(&imp->imp_lock);
284                 RETURN(-EINVAL);
285         }
286
287         list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
288                 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
289                        imp->imp_obd->obd_name,
290                        libcfs_nid2str(conn->oic_conn->c_peer.nid),
291                        conn->oic_last_attempt);
292                 
293                 /* Don't thrash connections */
294                 if (cfs_time_before_64(cfs_time_current_64(),
295                                      conn->oic_last_attempt + 
296                                      cfs_time_seconds(CONNECTION_SWITCH_MIN))) {
297                         continue;
298                 }
299
300                 /* If we have not tried this connection since the
301                    the last successful attempt, go with this one */
302                 if ((conn->oic_last_attempt == 0) ||
303                     cfs_time_beforeq_64(conn->oic_last_attempt,
304                                        imp->imp_last_success_conn)) {
305                         imp_conn = conn;
306                         tried_all = 0;
307                         break;
308                 }
309
310                 /* If all of the connections have already been tried
311                    since the last successful connection; just choose the
312                    least recently used */
313                 if (!imp_conn)
314                         imp_conn = conn;
315                 else if (cfs_time_before_64(conn->oic_last_attempt,
316                                             imp_conn->oic_last_attempt))
317                         imp_conn = conn;
318         }
319
320         /* if not found, simply choose the current one */
321         if (!imp_conn) {
322                 LASSERT(imp->imp_conn_current);
323                 imp_conn = imp->imp_conn_current;
324                 tried_all = 0;
325         }
326         LASSERT(imp_conn->oic_conn);
327
328         /* If we've tried everything, and we're back to the beginning of the
329            list, increase our timeout and try again. It will be reset when
330            we do finally connect. (FIXME: really we should wait for all network
331            state associated with the last connection attempt to drain before
332            trying to reconnect on it.) */
333         if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item)) {
334                 if (at_get(&imp->imp_at.iat_net_latency) <
335                     CONNECTION_SWITCH_MAX) {
336                         at_add(&imp->imp_at.iat_net_latency,
337                                at_get(&imp->imp_at.iat_net_latency) +
338                                CONNECTION_SWITCH_INC);
339                 }
340                 LASSERT(imp_conn->oic_last_attempt);
341                 CWARN("%s: tried all connections, increasing latency to %ds\n",
342                       imp->imp_obd->obd_name,
343                       at_get(&imp->imp_at.iat_net_latency));
344         }
345
346         imp_conn->oic_last_attempt = cfs_time_current_64();
347
348         /* switch connection, don't mind if it's same as the current one */
349         if (imp->imp_connection)
350                 ptlrpc_put_connection(imp->imp_connection);
351         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
352
353         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
354         LASSERT(dlmexp != NULL);
355         if (dlmexp->exp_connection)
356                 ptlrpc_put_connection(dlmexp->exp_connection);
357         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
358         class_export_put(dlmexp);
359
360         if (imp->imp_conn_current != imp_conn) {
361                 if (imp->imp_conn_current)
362                         LCONSOLE_INFO("Changing connection for %s to %s/%s\n",
363                                       imp->imp_obd->obd_name,
364                                       imp_conn->oic_uuid.uuid,
365                                       libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
366                 imp->imp_conn_current = imp_conn;
367         }
368
369         CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
370                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
371                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
372
373         spin_unlock(&imp->imp_lock);
374
375         RETURN(0);
376 }
377
378 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
379 {
380         struct obd_device *obd = imp->imp_obd;
381         int initial_connect = 0;
382         int rc;
383         __u64 committed_before_reconnect = 0;
384         struct ptlrpc_request *request;
385         int size[] = { sizeof(struct ptlrpc_body),
386                        sizeof(imp->imp_obd->u.cli.cl_target_uuid),
387                        sizeof(obd->obd_uuid),
388                        sizeof(imp->imp_dlm_handle),
389                        sizeof(imp->imp_connect_data) };
390         char *tmp[] = { NULL,
391                         obd2cli_tgt(imp->imp_obd),
392                         obd->obd_uuid.uuid,
393                         (char *)&imp->imp_dlm_handle,
394                         (char *)&imp->imp_connect_data };
395         struct ptlrpc_connect_async_args *aa;
396
397         ENTRY;
398         spin_lock(&imp->imp_lock);
399         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
400                 spin_unlock(&imp->imp_lock);
401                 CERROR("can't connect to a closed import\n");
402                 RETURN(-EINVAL);
403         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
404                 spin_unlock(&imp->imp_lock);
405                 CERROR("already connected\n");
406                 RETURN(0);
407         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
408                 spin_unlock(&imp->imp_lock);
409                 CERROR("already connecting\n");
410                 RETURN(-EALREADY);
411         }
412
413         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
414
415         imp->imp_conn_cnt++;
416         imp->imp_resend_replay = 0;
417
418         if (!lustre_handle_is_used(&imp->imp_remote_handle))
419                 initial_connect = 1;
420         else
421                 committed_before_reconnect = imp->imp_peer_committed_transno;
422
423         spin_unlock(&imp->imp_lock);
424
425         if (new_uuid) {
426                 struct obd_uuid uuid;
427
428                 obd_str2uuid(&uuid, new_uuid);
429                 rc = import_set_conn_priority(imp, &uuid);
430                 if (rc)
431                         GOTO(out, rc);
432         }
433
434         rc = import_select_connection(imp);
435         if (rc)
436                 GOTO(out, rc);
437
438         /* last in connection list */
439         if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
440                 if (imp->imp_initial_recov_bk && initial_connect) {
441                         CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
442                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
443                         /* Don't retry if connect fails */
444                         rc = 0;
445                         obd_set_info_async(obd->obd_self_export,
446                                            strlen(KEY_INIT_RECOV),
447                                            KEY_INIT_RECOV,
448                                            sizeof(rc), &rc, NULL);
449                 }
450                 if (imp->imp_recon_bk) {
451                         CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
452                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
453                         spin_lock(&imp->imp_lock);
454                         imp->imp_last_recon = 1;
455                         spin_unlock(&imp->imp_lock);
456                 }
457         }
458
459         /* Reset connect flags to the originally requested flags, in case
460          * the server is updated on-the-fly we will get the new features. */
461         imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
462         rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
463                            &obd->obd_uuid, &imp->imp_connect_data);
464         if (rc)
465                 GOTO(out, rc);
466
467         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
468                                   5, size, tmp);
469         if (!request)
470                 GOTO(out, rc = -ENOMEM);
471
472 #ifndef __KERNEL__
473         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
474 #endif
475         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_NEXT_VER);
476
477         request->rq_send_state = LUSTRE_IMP_CONNECTING;
478         /* Allow a slightly larger reply for future growth compatibility */
479         size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
480                               16 * sizeof(__u64);
481         ptlrpc_req_set_repsize(request, 2, size);
482         request->rq_interpret_reply = ptlrpc_connect_interpret;
483
484         CLASSERT(sizeof (*aa) <= sizeof (request->rq_async_args));
485         aa = (struct ptlrpc_connect_async_args *)&request->rq_async_args;
486         memset(aa, 0, sizeof *aa);
487
488         aa->pcaa_peer_committed = committed_before_reconnect;
489         aa->pcaa_initial_connect = initial_connect;
490         if (aa->pcaa_initial_connect) {
491                 spin_lock(&imp->imp_lock);
492                 imp->imp_replayable = 1;
493                 spin_unlock(&imp->imp_lock);
494                 if (AT_OFF)
495                         /* AT will use INITIAL_CONNECT_TIMEOUT the first
496                            time, adaptive after that. */
497                         request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
498         }
499
500         DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
501                   aa->pcaa_initial_connect ? "initial " : "re", 
502                   imp->imp_conn_cnt);
503         ptlrpcd_add_req(request);
504         rc = 0;
505 out:
506         if (rc != 0) {
507                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
508         }
509
510         RETURN(rc);
511 }
512 EXPORT_SYMBOL(ptlrpc_connect_import);
513
514 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
515 {
516 #ifdef __KERNEL__
517         struct obd_import_conn *imp_conn;
518 #endif
519         int wake_pinger = 0;
520
521         ENTRY;
522
523         spin_lock(&imp->imp_lock);
524         if (list_empty(&imp->imp_conn_list))
525                 GOTO(unlock, 0);
526
527 #ifdef __KERNEL__
528         imp_conn = list_entry(imp->imp_conn_list.prev,
529                               struct obd_import_conn,
530                               oic_item);
531
532         if (imp->imp_conn_current != imp_conn) {
533                 ptlrpc_ping_import_soon(imp);
534                 wake_pinger = 1;
535         }
536
537 #else
538         /* liblustre has no pinger thead, so we wakup pinger anyway */
539         wake_pinger = 1;
540 #endif 
541  unlock:
542         spin_unlock(&imp->imp_lock);
543
544         if (wake_pinger)
545                 ptlrpc_pinger_wake_up();
546
547         EXIT;
548 }
549
550 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
551                                     void * data, int rc)
552 {
553         struct ptlrpc_connect_async_args *aa = data;
554         struct obd_import *imp = request->rq_import;
555         struct client_obd *cli = &imp->imp_obd->u.cli;
556         struct lustre_handle old_hdl;
557         int msg_flags;
558         ENTRY;
559
560         spin_lock(&imp->imp_lock);
561         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
562                 spin_unlock(&imp->imp_lock);
563                 RETURN(0);
564         }
565         spin_unlock(&imp->imp_lock);
566
567         if (rc)
568                 GOTO(out, rc);
569
570         LASSERT(imp->imp_conn_current);
571
572         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
573
574         /* All imports are pingable */
575         spin_lock(&imp->imp_lock);
576         imp->imp_pingable = 1;
577
578         if (aa->pcaa_initial_connect) {
579                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
580                         imp->imp_replayable = 1;
581                         spin_unlock(&imp->imp_lock);
582                         CDEBUG(D_HA, "connected to replayable target: %s\n",
583                                obd2cli_tgt(imp->imp_obd));
584                 } else {
585                         imp->imp_replayable = 0;
586                         spin_unlock(&imp->imp_lock);
587                 }
588
589                 if (msg_flags & MSG_CONNECT_NEXT_VER) {
590                         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
591                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
592                                obd2cli_tgt(imp->imp_obd));
593                 } else {
594                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
595                                obd2cli_tgt(imp->imp_obd));
596                 }
597
598                 imp->imp_remote_handle =
599                                 *lustre_msg_get_handle(request->rq_repmsg);
600
601                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
602                 GOTO(finish, rc = 0);
603         } else {
604                 spin_unlock(&imp->imp_lock);
605         }
606
607         /* Determine what recovery state to move the import to. */
608         if (MSG_CONNECT_RECONNECT & msg_flags) {
609                 memset(&old_hdl, 0, sizeof(old_hdl));
610                 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
611                             sizeof (old_hdl))) {
612                         CERROR("%s@%s didn't like our handle "LPX64
613                                ", failed\n", obd2cli_tgt(imp->imp_obd),
614                                imp->imp_connection->c_remote_uuid.uuid,
615                                imp->imp_dlm_handle.cookie);
616                         GOTO(out, rc = -ENOTCONN);
617                 }
618
619                 if (memcmp(&imp->imp_remote_handle,
620                            lustre_msg_get_handle(request->rq_repmsg),
621                            sizeof(imp->imp_remote_handle))) {
622                         int level = D_ERROR;
623                         /* Old MGC can reconnect to a restarted MGS */
624                         if (strcmp(imp->imp_obd->obd_type->typ_name,
625                                    LUSTRE_MGC_NAME) == 0) {
626                                 level = D_CONFIG;
627                         }
628                         CDEBUG(level, 
629                                "%s@%s changed handle from "LPX64" to "LPX64
630                                "; copying, but this may foreshadow disaster\n",
631                                obd2cli_tgt(imp->imp_obd),
632                                imp->imp_connection->c_remote_uuid.uuid,
633                                imp->imp_remote_handle.cookie,
634                                lustre_msg_get_handle(request->rq_repmsg)->
635                                         cookie);
636                         imp->imp_remote_handle =
637                                      *lustre_msg_get_handle(request->rq_repmsg);
638                 } else {
639                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
640                                obd2cli_tgt(imp->imp_obd),
641                                imp->imp_connection->c_remote_uuid.uuid);
642                 }
643
644                 if (imp->imp_invalid) {
645                         CDEBUG(D_HA, "%s: reconnected but import is invalid; "
646                                "marking evicted\n", imp->imp_obd->obd_name);
647                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
648                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
649                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
650                                imp->imp_obd->obd_name,
651                                obd2cli_tgt(imp->imp_obd));
652
653                         spin_lock(&imp->imp_lock);
654                         imp->imp_resend_replay = 1;
655                         spin_unlock(&imp->imp_lock);
656
657                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
658                 } else {
659                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
660                 }
661         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
662                 LASSERT(imp->imp_replayable);
663                 imp->imp_remote_handle =
664                                 *lustre_msg_get_handle(request->rq_repmsg);
665                 imp->imp_last_replay_transno = 0;
666                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
667         } else {
668                 DEBUG_REQ(D_HA, request, "evicting (not initial connect and "
669                           "flags reconnect/recovering not set: %x)",msg_flags);
670                 imp->imp_remote_handle =
671                                 *lustre_msg_get_handle(request->rq_repmsg);
672                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
673         }
674
675         /* Sanity checks for a reconnected import. */
676         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
677                 CERROR("imp_replayable flag does not match server "
678                        "after reconnect. We should LBUG right here.\n");
679         }
680
681         if (lustre_msg_get_last_committed(request->rq_repmsg) <
682             aa->pcaa_peer_committed) {
683                 CERROR("%s went back in time (transno "LPD64
684                        " was previously committed, server now claims "LPD64
685                        ")!  See https://bugzilla.clusterfs.com/"
686                        "long_list.cgi?buglist=9646\n",
687                        obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
688                        lustre_msg_get_last_committed(request->rq_repmsg));
689         }
690
691 finish:
692         rc = ptlrpc_import_recovery_state_machine(imp);
693         if (rc != 0) {
694                 if (rc == -ENOTCONN) {
695                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
696                                "invalidating and reconnecting\n",
697                                obd2cli_tgt(imp->imp_obd),
698                                imp->imp_connection->c_remote_uuid.uuid);
699                         ptlrpc_connect_import(imp, NULL);
700                         RETURN(0);
701                 }
702         } else {
703                 struct obd_connect_data *ocd;
704                 struct obd_export *exp;
705
706                 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
707                                          lustre_swab_connect);
708                 spin_lock(&imp->imp_lock);
709                 list_del(&imp->imp_conn_current->oic_item);
710                 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
711                 imp->imp_last_success_conn =
712                         imp->imp_conn_current->oic_last_attempt;
713
714                 if (ocd == NULL) {
715                         spin_unlock(&imp->imp_lock);
716                         CERROR("Wrong connect data from server\n");
717                         rc = -EPROTO;
718                         GOTO(out, rc);
719                 }
720
721                 imp->imp_connect_data = *ocd;
722
723                 exp = class_conn2export(&imp->imp_dlm_handle);
724                 spin_unlock(&imp->imp_lock);
725
726                 /* check that server granted subset of flags we asked for. */
727                 LASSERTF((ocd->ocd_connect_flags &
728                           imp->imp_connect_flags_orig) ==
729                          ocd->ocd_connect_flags, LPX64" != "LPX64,
730                          imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
731
732                 if (!exp) {
733                         /* This could happen if export is cleaned during the 
734                            connect attempt */
735                         CERROR("Missing export for %s\n", 
736                                imp->imp_obd->obd_name);
737                         GOTO(out, rc = -ENODEV);
738                 }
739                 exp->exp_connect_flags = ocd->ocd_connect_flags;
740                 imp->imp_obd->obd_self_export->exp_connect_flags = ocd->ocd_connect_flags;
741                 class_export_put(exp);
742
743                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
744
745                 if (!ocd->ocd_ibits_known &&
746                     ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
747                         CERROR("Inodebits aware server returned zero compatible"
748                                " bits?\n");
749
750                 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
751                     (ocd->ocd_version > LUSTRE_VERSION_CODE +
752                                         LUSTRE_VERSION_OFFSET_WARN ||
753                      ocd->ocd_version < LUSTRE_VERSION_CODE -
754                                         LUSTRE_VERSION_OFFSET_WARN)) {
755                         /* Sigh, some compilers do not like #ifdef in the middle
756                            of macro arguments */
757 #ifdef __KERNEL__
758                         const char *older =
759                                 "older.  Consider upgrading this client";
760 #else
761                         const char *older =
762                                 "older.  Consider recompiling this application";
763 #endif
764                         const char *newer = "newer than client version";
765
766                         LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
767                                       "is much %s (%s)\n",
768                                       obd2cli_tgt(imp->imp_obd),
769                                       OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
770                                       OBD_OCD_VERSION_MINOR(ocd->ocd_version),
771                                       OBD_OCD_VERSION_PATCH(ocd->ocd_version),
772                                       OBD_OCD_VERSION_FIX(ocd->ocd_version),
773                                       ocd->ocd_version > LUSTRE_VERSION_CODE ?
774                                       newer : older, LUSTRE_VERSION_STRING);
775                 }
776
777                 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
778                         cli->cl_max_pages_per_rpc = 
779                                 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
780                 }
781
782                 imp->imp_obd->obd_namespace->ns_connect_flags = ocd->ocd_connect_flags;
783
784                 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
785                     (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
786                         /* We need a per-message support flag, because 
787                            a. we don't know if the incoming connect reply
788                               supports AT or not (in reply_in_callback)
789                               until we unpack it.
790                            b. failovered server means export and flags are gone
791                               (in ptlrpc_send_reply).
792                            Can only be set when we know AT is supported at 
793                            both ends */
794                         imp->imp_msg_flags |= MSG_AT_SUPPORT;
795                 else
796                         imp->imp_msg_flags &= ~MSG_AT_SUPPORT;
797
798                 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
799                         (cli->cl_max_pages_per_rpc > 0));
800         }
801
802  out:
803         if (rc != 0) {
804                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
805                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov)
806                         ptlrpc_deactivate_import(imp);
807
808                 if (imp->imp_recon_bk && imp->imp_last_recon) {
809                         /* Give up trying to reconnect */
810                         imp->imp_obd->obd_no_recov = 1;
811                         ptlrpc_deactivate_import(imp);
812                 }
813
814                 if (rc == -EPROTO) {
815                         struct obd_connect_data *ocd;
816                         ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
817                                                  sizeof *ocd,
818                                                  lustre_swab_connect);
819                         if (ocd &&
820                             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
821                             (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
822                            /* Actually servers are only supposed to refuse
823                               connection from liblustre clients, so we should
824                               never see this from VFS context */
825                                 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
826                                         "(%d.%d.%d.%d)"
827                                         " refused connection from this client "
828                                         "with an incompatible version (%s).  "
829                                         "Client must be recompiled\n",
830                                         obd2cli_tgt(imp->imp_obd),
831                                         OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
832                                         OBD_OCD_VERSION_MINOR(ocd->ocd_version),
833                                         OBD_OCD_VERSION_PATCH(ocd->ocd_version),
834                                         OBD_OCD_VERSION_FIX(ocd->ocd_version),
835                                         LUSTRE_VERSION_STRING);
836                                 ptlrpc_deactivate_import(imp);
837                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
838                         }
839                         RETURN(-EPROTO);
840                 }
841
842                 ptlrpc_maybe_ping_import_soon(imp);
843
844                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
845                        obd2cli_tgt(imp->imp_obd),
846                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
847         }
848         
849         spin_lock(&imp->imp_lock);
850         imp->imp_last_recon = 0;
851         spin_unlock(&imp->imp_lock);
852
853         cfs_waitq_signal(&imp->imp_recovery_waitq);
854         RETURN(rc);
855 }
856
857 static int completed_replay_interpret(struct ptlrpc_request *req,
858                                     void * data, int rc)
859 {
860         ENTRY;
861         atomic_dec(&req->rq_import->imp_replay_inflight);
862         if (req->rq_status == 0) {
863                 ptlrpc_import_recovery_state_machine(req->rq_import);
864         } else {
865                 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
866                        "reconnecting\n",
867                        req->rq_import->imp_obd->obd_name, req->rq_status);
868                 ptlrpc_connect_import(req->rq_import, NULL);
869         }
870
871         RETURN(0);
872 }
873
874 static int signal_completed_replay(struct obd_import *imp)
875 {
876         struct ptlrpc_request *req;
877         ENTRY;
878
879         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
880         atomic_inc(&imp->imp_replay_inflight);
881
882         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
883         if (!req) {
884                 atomic_dec(&imp->imp_replay_inflight);
885                 RETURN(-ENOMEM);
886         }
887
888         ptlrpc_req_set_repsize(req, 1, NULL);
889         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
890         lustre_msg_add_flags(req->rq_reqmsg, MSG_LAST_REPLAY);
891         req->rq_timeout *= 3;
892         req->rq_interpret_reply = completed_replay_interpret;
893
894         ptlrpcd_add_req(req);
895         RETURN(0);
896 }
897
898 #ifdef __KERNEL__
899 static int ptlrpc_invalidate_import_thread(void *data)
900 {
901         struct obd_import *imp = data;
902
903         ENTRY;
904
905         ptlrpc_daemonize("ll_imp_inval");
906         
907         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
908                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
909                imp->imp_connection->c_remote_uuid.uuid);
910
911         ptlrpc_invalidate_import(imp);
912
913         if (obd_dump_on_eviction) {
914                 CERROR("dump the log upon eviction\n");
915                 libcfs_debug_dumplog();
916         }
917
918         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
919         ptlrpc_import_recovery_state_machine(imp);
920
921         RETURN(0);
922 }
923 #endif
924
925 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
926 {
927         int rc = 0;
928         int inflight;
929         char *target_start;
930         int target_len;
931
932         ENTRY;
933         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
934                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
935                           &target_start, &target_len);
936                 /* Don't care about MGC eviction */
937                 if (strcmp(imp->imp_obd->obd_type->typ_name,
938                            LUSTRE_MGC_NAME) != 0) {
939                         LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
940                                            "%.*s; in progress operations using "
941                                            "this service will fail.\n",
942                                            target_len, target_start);
943                 }
944                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
945                        obd2cli_tgt(imp->imp_obd),
946                        imp->imp_connection->c_remote_uuid.uuid);
947
948 #ifdef __KERNEL__
949                 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
950                                    CLONE_VM | CLONE_FILES);
951                 if (rc < 0)
952                         CERROR("error starting invalidate thread: %d\n", rc);
953                 else
954                         rc = 0;
955                 RETURN(rc);
956 #else
957                 ptlrpc_invalidate_import(imp);
958
959                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
960 #endif
961         }
962
963         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
964                 CDEBUG(D_HA, "replay requested by %s\n",
965                        obd2cli_tgt(imp->imp_obd));
966                 rc = ptlrpc_replay_next(imp, &inflight);
967                 if (inflight == 0 &&
968                     atomic_read(&imp->imp_replay_inflight) == 0) {
969                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
970                         rc = ldlm_replay_locks(imp);
971                         if (rc)
972                                 GOTO(out, rc);
973                 }
974                 rc = 0;
975         }
976
977         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
978                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
979                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
980                         rc = signal_completed_replay(imp);
981                         if (rc)
982                                 GOTO(out, rc);
983                 }
984
985         }
986
987         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
988                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
989                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
990                 }
991         }
992
993         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
994                 CDEBUG(D_HA, "reconnected to %s@%s\n",
995                        obd2cli_tgt(imp->imp_obd),
996                        imp->imp_connection->c_remote_uuid.uuid);
997
998                 rc = ptlrpc_resend(imp);
999                 if (rc)
1000                         GOTO(out, rc);
1001                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1002                 ptlrpc_activate_import(imp);
1003
1004                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1005                           &target_start, &target_len);
1006                 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1007                               "using nid %s.\n", imp->imp_obd->obd_name,
1008                               target_len, target_start,
1009                               libcfs_nid2str(imp->imp_connection->c_peer.nid));
1010         }
1011
1012         if (imp->imp_state == LUSTRE_IMP_FULL) {
1013                 cfs_waitq_signal(&imp->imp_recovery_waitq);
1014                 ptlrpc_wake_delayed(imp);
1015         }
1016
1017  out:
1018         RETURN(rc);
1019 }
1020
1021 static int back_to_sleep(void *unused)
1022 {
1023         return 0;
1024 }
1025
1026 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1027 {
1028         struct ptlrpc_request *req;
1029         int rq_opc, rc = 0;
1030         ENTRY;
1031
1032         switch (imp->imp_connect_op) {
1033         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1034         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1035         case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1036         default:
1037                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1038                        obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1039                 RETURN(-EINVAL);
1040         }
1041
1042         if (ptlrpc_import_in_recovery(imp)) {
1043                 struct l_wait_info lwi;
1044                 cfs_duration_t timeout;
1045
1046                 if (AT_OFF) {
1047                         timeout = cfs_time_seconds(obd_timeout);
1048                 } else {
1049                         int idx = import_at_get_index(imp, 
1050                                 imp->imp_client->cli_request_portal);
1051                         timeout = cfs_time_seconds(
1052                                 at_get(&imp->imp_at.iat_service_estimate[idx]));
1053                 }
1054                 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout), 
1055                                        back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1056                 rc = l_wait_event(imp->imp_recovery_waitq,
1057                                   !ptlrpc_import_in_recovery(imp), &lwi);
1058         }
1059
1060         spin_lock(&imp->imp_lock);
1061         if (imp->imp_state != LUSTRE_IMP_FULL)
1062                 GOTO(out, 0);
1063
1064         spin_unlock(&imp->imp_lock);
1065
1066         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1067         if (req) {
1068                 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1069                  * it fails.  We can get through the above with a down server
1070                  * if the client doesn't know the server is gone yet. */
1071                 req->rq_no_resend = 1;
1072                 
1073 #ifndef CRAY_XT3
1074                 /* We want client umounts to happen quickly, no matter the 
1075                    server state... */
1076                 req->rq_timeout = min_t(int, req->rq_timeout,
1077                                         INITIAL_CONNECT_TIMEOUT);
1078 #else
1079                 /* ... but we always want liblustre clients to nicely 
1080                    disconnect, so only use the adaptive value. */
1081                 if (AT_OFF)
1082                         req->rq_timeout = obd_timeout / 3;
1083 #endif
1084
1085                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1086                 req->rq_send_state =  LUSTRE_IMP_CONNECTING;
1087                 ptlrpc_req_set_repsize(req, 1, NULL);
1088                 rc = ptlrpc_queue_wait(req);
1089                 ptlrpc_req_finished(req);
1090         }
1091
1092         spin_lock(&imp->imp_lock);
1093 out:
1094         if (noclose) 
1095                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1096         else
1097                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1098         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1099         /* Try all connections in the future - bz 12758 */ 
1100         imp->imp_last_recon = 0;
1101         spin_unlock(&imp->imp_lock);
1102
1103         RETURN(rc);
1104 }
1105
1106 /* Sets maximal number of RPCs possible originating from other side of this
1107    import (server) to us and number of async RPC replies that we are not waiting
1108    for arriving */
1109 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1110 {
1111         LNetSetAsync(imp->imp_connection->c_peer, count);
1112 }
1113
1114
1115 /* Adaptive Timeout utils */
1116
1117 /* Bin into timeslices using AT_BINS bins.
1118    This gives us a max of the last binlimit*AT_BINS secs without the storage,
1119    but still smoothing out a return to normalcy from a slow response.
1120    (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1121 int at_add(struct adaptive_timeout *at, unsigned int val) 
1122 {
1123         unsigned int old = at->at_current;
1124         time_t now = cfs_time_current_sec();
1125         time_t binlimit = max_t(time_t, adaptive_timeout_history / AT_BINS, 1);
1126
1127         LASSERT(at);
1128 #if 0
1129         CDEBUG(D_INFO, "add %u to %p time=%lu v=%u (%u %u %u %u)\n", 
1130                val, at, now - at->at_binstart, at->at_current,
1131                at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1132 #endif
1133         if (val == 0) 
1134                 /* 0's don't count, because we never want our timeout to 
1135                    drop to 0, and because 0 could mean an error */
1136                 return 0;
1137
1138         spin_lock(&at->at_lock);
1139
1140         if (unlikely(at->at_binstart == 0)) {
1141                 /* Special case to remove default from history */
1142                 at->at_current = val;
1143                 at->at_worst_ever = val;
1144                 at->at_worst_time = now;
1145                 at->at_hist[0] = val;
1146                 at->at_binstart = now;
1147         } else if (now - at->at_binstart < binlimit ) {
1148                 /* in bin 0 */
1149                 at->at_hist[0] = max(val, at->at_hist[0]);
1150                 at->at_current = max(val, at->at_current);
1151         } else {
1152                 int i, shift;
1153                 unsigned int maxv = val;
1154                 /* move bins over */
1155                 shift = (now - at->at_binstart) / binlimit;
1156                 LASSERT(shift > 0);
1157                 for(i = AT_BINS - 1; i >= 0; i--) {
1158                         if (i >= shift) {
1159                                 at->at_hist[i] = at->at_hist[i - shift];
1160                                 maxv = max(maxv, at->at_hist[i]);
1161                         } else {
1162                                 at->at_hist[i] = 0;
1163                         }
1164                 }
1165                 at->at_hist[0] = val;
1166                 at->at_current = maxv;
1167                 at->at_binstart += shift * binlimit;
1168         }
1169
1170         if (at->at_current > at->at_worst_ever) {
1171                 at->at_worst_ever = at->at_current;
1172                 at->at_worst_time = now;
1173         }
1174
1175         if (at->at_flags & AT_FLG_NOHIST)
1176                 /* Only keep last reported val; keeping the rest of the history
1177                    for proc only */
1178                 at->at_current = val;
1179
1180 #if 0
1181         if (at->at_current != old)
1182                 CDEBUG(D_ADAPTTO, "AT change: old=%u new=%u delta=%d (val=%u) "
1183                        "hist %u %u %u %u\n",
1184                        old, at->at_current, at->at_current - old, val,
1185                        at->at_hist[0], at->at_hist[1], at->at_hist[2],
1186                        at->at_hist[3]);
1187 #endif
1188         
1189         /* if we changed, report the old value */
1190         old = (at->at_current != old) ? old : 0;
1191         
1192         spin_unlock(&at->at_lock);
1193         return old;
1194 }
1195
1196 /* Find the imp_at index for a given portal; assign if space available */
1197 int import_at_get_index(struct obd_import *imp, int portal) 
1198 {
1199         struct imp_at *at = &imp->imp_at;
1200         int i;
1201
1202         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1203                 if (at->iat_portal[i] == portal) 
1204                         return i;
1205                 if (at->iat_portal[i] == 0)
1206                         /* unused */
1207                         break;
1208         }
1209
1210         /* Not found in list, add it under a lock */
1211         spin_lock(&imp->imp_lock);
1212
1213         /* Check unused under lock */
1214         for (; i < IMP_AT_MAX_PORTALS; i++) {
1215                 if (at->iat_portal[i] == portal) 
1216                         goto out;
1217                 if (at->iat_portal[i] == 0)
1218                         /* unused */
1219                         break;
1220         }
1221         
1222         /* Not enough portals? */
1223         LASSERT(i < IMP_AT_MAX_PORTALS);
1224
1225         at->iat_portal[i] = portal;
1226 out:
1227         spin_unlock(&imp->imp_lock);
1228         return i;
1229 }
1230
1231 /* Get total expected lock callback time (net + service).
1232    Since any early reply will only affect the RPC wait time, and not
1233    any local lock timer we set based on the return value here,
1234    we should be conservative. */
1235 int import_at_get_ldlm(struct obd_import *imp) 
1236 {
1237         int idx, tot;
1238         
1239         if (!imp || !imp->imp_client || AT_OFF)
1240                 return obd_timeout;
1241         
1242         idx = import_at_get_index(imp, imp->imp_client->cli_request_portal);
1243         tot = at_get(&imp->imp_at.iat_net_latency) +
1244                 at_get(&imp->imp_at.iat_service_estimate[idx]);
1245
1246         /* add an arbitrary minimum: 150% + 10 sec */
1247         tot += (tot >> 1) + 10;
1248         return tot;
1249 }
1250