Whamcloud - gitweb
b=23596 account direct i/o inflight separately from non-direct i/o
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_LDLM
41
42 #ifdef __KERNEL__
43 # include <libcfs/libcfs.h>
44 #else
45 # include <liblustre.h>
46 #endif
47 #include <obd.h>
48 #include <lustre_mds.h>
49 #include <lustre_dlm.h>
50 #include <lustre_net.h>
51 #include "ldlm_internal.h"
52
53 /* @priority: if non-zero, move the selected to the list head
54  * @create: if zero, only search in existed connections
55  */
56 static int import_set_conn(struct obd_import *imp, struct obd_uuid *uuid,
57                            int priority, int create)
58 {
59         struct ptlrpc_connection *ptlrpc_conn;
60         struct obd_import_conn *imp_conn = NULL, *item;
61         int rc = 0;
62         ENTRY;
63
64         if (!create && !priority) {
65                 CDEBUG(D_HA, "Nothing to do\n");
66                 RETURN(-EINVAL);
67         }
68
69         ptlrpc_conn = ptlrpc_uuid_to_connection(uuid);
70         if (!ptlrpc_conn) {
71                 CDEBUG(D_HA, "can't find connection %s\n", uuid->uuid);
72                 RETURN (-ENOENT);
73         }
74
75         if (create) {
76                 OBD_ALLOC(imp_conn, sizeof(*imp_conn));
77                 if (!imp_conn) {
78                         GOTO(out_put, rc = -ENOMEM);
79                 }
80         }
81
82         spin_lock(&imp->imp_lock);
83         list_for_each_entry(item, &imp->imp_conn_list, oic_item) {
84                 if (obd_uuid_equals(uuid, &item->oic_uuid)) {
85                         if (priority) {
86                                 list_del(&item->oic_item);
87                                 list_add(&item->oic_item, &imp->imp_conn_list);
88                                 item->oic_last_attempt = 0;
89                         }
90                         CDEBUG(D_HA, "imp %p@%s: found existing conn %s%s\n",
91                                imp, imp->imp_obd->obd_name, uuid->uuid,
92                                (priority ? ", moved to head" : ""));
93                         spin_unlock(&imp->imp_lock);
94                         GOTO(out_free, rc = 0);
95                 }
96         }
97         /* not found */
98         if (create) {
99                 imp_conn->oic_conn = ptlrpc_conn;
100                 imp_conn->oic_uuid = *uuid;
101                 imp_conn->oic_last_attempt = 0;
102                 if (priority)
103                         list_add(&imp_conn->oic_item, &imp->imp_conn_list);
104                 else
105                         list_add_tail(&imp_conn->oic_item, &imp->imp_conn_list);
106                 CDEBUG(D_HA, "imp %p@%s: add connection %s at %s\n",
107                        imp, imp->imp_obd->obd_name, uuid->uuid,
108                        (priority ? "head" : "tail"));
109         } else {
110                 spin_unlock(&imp->imp_lock);
111                 GOTO(out_free, rc = -ENOENT);
112
113         }
114
115         spin_unlock(&imp->imp_lock);
116         RETURN(0);
117 out_free:
118         if (imp_conn)
119                 OBD_FREE(imp_conn, sizeof(*imp_conn));
120 out_put:
121         ptlrpc_connection_put(ptlrpc_conn);
122         RETURN(rc);
123 }
124
125 int import_set_conn_priority(struct obd_import *imp, struct obd_uuid *uuid)
126 {
127         return import_set_conn(imp, uuid, 1, 0);
128 }
129
130 int client_import_add_conn(struct obd_import *imp, struct obd_uuid *uuid,
131                            int priority)
132 {
133         return import_set_conn(imp, uuid, priority, 1);
134 }
135
136 int client_import_del_conn(struct obd_import *imp, struct obd_uuid *uuid)
137 {
138         struct obd_import_conn *imp_conn;
139         struct obd_export *dlmexp;
140         int rc = -ENOENT;
141         ENTRY;
142
143         spin_lock(&imp->imp_lock);
144         if (list_empty(&imp->imp_conn_list)) {
145                 LASSERT(!imp->imp_connection);
146                 GOTO(out, rc);
147         }
148
149         list_for_each_entry(imp_conn, &imp->imp_conn_list, oic_item) {
150                 if (!obd_uuid_equals(uuid, &imp_conn->oic_uuid))
151                         continue;
152                 LASSERT(imp_conn->oic_conn);
153
154                 /* is current conn? */
155                 if (imp_conn == imp->imp_conn_current) {
156                         LASSERT(imp_conn->oic_conn == imp->imp_connection);
157
158                         if (imp->imp_state != LUSTRE_IMP_CLOSED &&
159                             imp->imp_state != LUSTRE_IMP_DISCON) {
160                                 CERROR("can't remove current connection\n");
161                                 GOTO(out, rc = -EBUSY);
162                         }
163
164                         ptlrpc_connection_put(imp->imp_connection);
165                         imp->imp_connection = NULL;
166
167                         dlmexp = class_conn2export(&imp->imp_dlm_handle);
168                         if (dlmexp && dlmexp->exp_connection) {
169                                 LASSERT(dlmexp->exp_connection ==
170                                         imp_conn->oic_conn);
171                                 ptlrpc_connection_put(dlmexp->exp_connection);
172                                 dlmexp->exp_connection = NULL;
173                         }
174                 }
175
176                 list_del(&imp_conn->oic_item);
177                 ptlrpc_connection_put(imp_conn->oic_conn);
178                 OBD_FREE(imp_conn, sizeof(*imp_conn));
179                 CDEBUG(D_HA, "imp %p@%s: remove connection %s\n",
180                        imp, imp->imp_obd->obd_name, uuid->uuid);
181                 rc = 0;
182                 break;
183         }
184 out:
185         spin_unlock(&imp->imp_lock);
186         if (rc == -ENOENT)
187                 CERROR("connection %s not found\n", uuid->uuid);
188         RETURN(rc);
189 }
190
191 /* configure an RPC client OBD device
192  *
193  * lcfg parameters:
194  * 1 - client UUID
195  * 2 - server UUID
196  * 3 - inactive-on-startup
197  */
198 int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
199 {
200         struct lustre_cfg* lcfg = buf;
201         struct client_obd *cli = &obddev->u.cli;
202         struct obd_import *imp;
203         struct obd_uuid server_uuid;
204         int rq_portal, rp_portal, connect_op;
205         char *name = obddev->obd_type->typ_name;
206         int rc;
207         ENTRY;
208
209         /* In a more perfect world, we would hang a ptlrpc_client off of
210          * obd_type and just use the values from there. */
211         if (!strcmp(name, LUSTRE_OSC_NAME)) {
212                 rq_portal = OST_REQUEST_PORTAL;
213                 rp_portal = OSC_REPLY_PORTAL;
214                 connect_op = OST_CONNECT;
215         } else if (!strcmp(name, LUSTRE_MDC_NAME)) {
216                 rq_portal = MDS_REQUEST_PORTAL;
217                 rp_portal = MDC_REPLY_PORTAL;
218                 connect_op = MDS_CONNECT;
219         } else if (!strcmp(name, LUSTRE_MGC_NAME)) {
220                 rq_portal = MGS_REQUEST_PORTAL;
221                 rp_portal = MGC_REPLY_PORTAL;
222                 connect_op = MGS_CONNECT;
223         } else {
224                 CERROR("unknown client OBD type \"%s\", can't setup\n",
225                        name);
226                 RETURN(-EINVAL);
227         }
228
229         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
230                 CERROR("requires a TARGET UUID\n");
231                 RETURN(-EINVAL);
232         }
233
234         if (LUSTRE_CFG_BUFLEN(lcfg, 1) > 37) {
235                 CERROR("client UUID must be less than 38 characters\n");
236                 RETURN(-EINVAL);
237         }
238
239         if (LUSTRE_CFG_BUFLEN(lcfg, 2) < 1) {
240                 CERROR("setup requires a SERVER UUID\n");
241                 RETURN(-EINVAL);
242         }
243
244         if (LUSTRE_CFG_BUFLEN(lcfg, 2) > 37) {
245                 CERROR("target UUID must be less than 38 characters\n");
246                 RETURN(-EINVAL);
247         }
248
249         init_rwsem(&cli->cl_sem);
250         sema_init(&cli->cl_mgc_sem, 1);
251         cli->cl_conn_count = 0;
252         memcpy(server_uuid.uuid, lustre_cfg_buf(lcfg, 2),
253                min_t(unsigned int, LUSTRE_CFG_BUFLEN(lcfg, 2),
254                      sizeof(server_uuid)));
255
256         cli->cl_dirty = 0;
257         cli->cl_avail_grant = 0;
258         /* FIXME: should limit this for the sum of all cl_dirty_max */
259         cli->cl_dirty_max = OSC_MAX_DIRTY_DEFAULT * 1024 * 1024;
260         if (cli->cl_dirty_max >> CFS_PAGE_SHIFT > num_physpages / 8)
261                 cli->cl_dirty_max = num_physpages << (CFS_PAGE_SHIFT - 3);
262         CFS_INIT_LIST_HEAD(&cli->cl_cache_waiters);
263         CFS_INIT_LIST_HEAD(&cli->cl_loi_ready_list);
264         CFS_INIT_LIST_HEAD(&cli->cl_loi_hp_ready_list);
265         CFS_INIT_LIST_HEAD(&cli->cl_loi_write_list);
266         CFS_INIT_LIST_HEAD(&cli->cl_loi_read_list);
267         client_obd_list_lock_init(&cli->cl_loi_list_lock);
268         cli->cl_r_in_flight = 0;
269         cli->cl_w_in_flight = 0;
270         cli->cl_dio_r_in_flight = 0;
271         cli->cl_dio_w_in_flight = 0;
272         spin_lock_init(&cli->cl_read_rpc_hist.oh_lock);
273         spin_lock_init(&cli->cl_write_rpc_hist.oh_lock);
274         spin_lock_init(&cli->cl_read_page_hist.oh_lock);
275         spin_lock_init(&cli->cl_write_page_hist.oh_lock);
276         spin_lock_init(&cli->cl_read_offset_hist.oh_lock);
277         spin_lock_init(&cli->cl_write_offset_hist.oh_lock);
278         cfs_waitq_init(&cli->cl_destroy_waitq);
279         atomic_set(&cli->cl_destroy_in_flight, 0);
280 #ifdef ENABLE_CHECKSUM
281         /* Turn on checksumming by default. */
282         cli->cl_checksum = 1;
283         /*
284          * The supported checksum types will be worked out at connect time
285          * Set cl_chksum* to CRC32 for now to avoid returning screwed info
286          * through procfs.
287          */
288         cli->cl_cksum_type = cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
289 #endif
290         atomic_set(&cli->cl_resends, OSC_DEFAULT_RESENDS);
291
292         /* This value may be changed at connect time in
293            ptlrpc_connect_interpret. */
294         cli->cl_max_pages_per_rpc = min((int)PTLRPC_MAX_BRW_PAGES,
295                                         (int)(1024 * 1024 >> CFS_PAGE_SHIFT));
296
297         if (!strcmp(name, LUSTRE_MDC_NAME)) {
298                 cli->cl_max_rpcs_in_flight = MDC_MAX_RIF_DEFAULT;
299         } else if (num_physpages >> (20 - CFS_PAGE_SHIFT) <= 128 /* MB */) {
300                 cli->cl_max_rpcs_in_flight = 2;
301         } else if (num_physpages >> (20 - CFS_PAGE_SHIFT) <= 256 /* MB */) {
302                 cli->cl_max_rpcs_in_flight = 3;
303         } else if (num_physpages >> (20 - CFS_PAGE_SHIFT) <= 512 /* MB */) {
304                 cli->cl_max_rpcs_in_flight = 4;
305         } else {
306                 cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
307         }
308         rc = ldlm_get_ref();
309         if (rc) {
310                 CERROR("ldlm_get_ref failed: %d\n", rc);
311                 GOTO(err, rc);
312         }
313
314         ptlrpc_init_client(rq_portal, rp_portal, name,
315                            &obddev->obd_ldlm_client);
316
317         imp = class_new_import(obddev);
318         if (imp == NULL)
319                 GOTO(err_ldlm, rc = -ENOENT);
320         imp->imp_client = &obddev->obd_ldlm_client;
321         imp->imp_connect_op = connect_op;
322         imp->imp_initial_recov = 1;
323         imp->imp_initial_recov_bk = 0;
324         CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
325         memcpy(cli->cl_target_uuid.uuid, lustre_cfg_buf(lcfg, 1),
326                LUSTRE_CFG_BUFLEN(lcfg, 1));
327         class_import_put(imp);
328
329         rc = client_import_add_conn(imp, &server_uuid, 1);
330         if (rc) {
331                 CERROR("can't add initial connection\n");
332                 GOTO(err_import, rc);
333         }
334
335         cli->cl_import = imp;
336         /* cli->cl_max_mds_{easize,cookiesize} updated by mdc_init_ea_size() */
337         cli->cl_max_mds_easize = sizeof(struct lov_mds_md_v3);
338         cli->cl_max_mds_cookiesize = sizeof(struct llog_cookie);
339
340         if (LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
341                 if (!strcmp(lustre_cfg_string(lcfg, 3), "inactive")) {
342                         CDEBUG(D_HA, "marking %s %s->%s as inactive\n",
343                                name, obddev->obd_name,
344                                cli->cl_target_uuid.uuid);
345                         spin_lock(&imp->imp_lock);
346                         imp->imp_deactive = 1;
347                         spin_unlock(&imp->imp_lock);
348                 }
349         }
350
351         obddev->obd_namespace = ldlm_namespace_new(obddev, obddev->obd_name,
352                                                    LDLM_NAMESPACE_CLIENT,
353                                                    LDLM_NAMESPACE_GREEDY);
354         if (obddev->obd_namespace == NULL) {
355                 CERROR("Unable to create client namespace - %s\n",
356                        obddev->obd_name);
357                 GOTO(err_import, rc = -ENOMEM);
358         }
359
360         cli->cl_qchk_stat = CL_NOT_QUOTACHECKED;
361
362         RETURN(rc);
363
364 err_import:
365         class_destroy_import(imp);
366 err_ldlm:
367         ldlm_put_ref();
368 err:
369         RETURN(rc);
370
371 }
372
373 int client_obd_cleanup(struct obd_device *obddev)
374 {
375         ENTRY;
376
377         ldlm_namespace_free_post(obddev->obd_namespace);
378         obddev->obd_namespace = NULL;
379
380         ldlm_put_ref();
381         RETURN(0);
382 }
383
384 /* ->o_connect() method for client side (OSC and MDC and MGC) */
385 int client_connect_import(struct lustre_handle *dlm_handle,
386                           struct obd_device *obd, struct obd_uuid *cluuid,
387                           struct obd_connect_data *data, void *localdata)
388 {
389         struct client_obd *cli = &obd->u.cli;
390         struct obd_import *imp = cli->cl_import;
391         struct obd_export **exp = localdata;
392         struct obd_connect_data *ocd;
393         int rc;
394         ENTRY;
395
396         down_write(&cli->cl_sem);
397         CDEBUG(D_INFO, "connect %s - %d\n", obd->obd_name,
398                cli->cl_conn_count);
399
400         if (cli->cl_conn_count > 0)
401                 GOTO(out_sem, rc = -EALREADY);
402
403         rc = class_connect(dlm_handle, obd, cluuid);
404         if (rc)
405                 GOTO(out_sem, rc);
406
407         cli->cl_conn_count++;
408         *exp = class_conn2export(dlm_handle);
409
410         LASSERT(obd->obd_namespace);
411
412         imp->imp_dlm_handle = *dlm_handle;
413         rc = ptlrpc_init_import(imp);
414         if (rc != 0)
415                 GOTO(out_ldlm, rc);
416
417         ocd = &imp->imp_connect_data;
418         if (data) {
419                 *ocd = *data;
420                 imp->imp_connect_flags_orig = data->ocd_connect_flags;
421         }
422
423         rc = ptlrpc_connect_import(imp, NULL);
424         if (rc != 0) {
425                 LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
426                 GOTO(out_ldlm, rc);
427         }
428         LASSERT((*exp)->exp_connection);
429
430         if (data) {
431                 LASSERT((ocd->ocd_connect_flags & data->ocd_connect_flags) ==
432                         ocd->ocd_connect_flags);
433                 data->ocd_connect_flags = ocd->ocd_connect_flags;
434         }
435
436         ptlrpc_pinger_add_import(imp);
437         EXIT;
438
439         if (rc) {
440 out_ldlm:
441                 cli->cl_conn_count--;
442                 class_disconnect(*exp);
443                 *exp = NULL;
444         }
445 out_sem:
446         up_write(&cli->cl_sem);
447         return rc;
448 }
449
450 int client_disconnect_export(struct obd_export *exp)
451 {
452         struct obd_device *obd = class_exp2obd(exp);
453         struct client_obd *cli;
454         struct obd_import *imp;
455         int rc = 0, err;
456         ENTRY;
457
458         if (!obd) {
459                 CERROR("invalid export for disconnect: exp %p cookie "LPX64"\n",
460                        exp, exp ? exp->exp_handle.h_cookie : -1);
461                 RETURN(-EINVAL);
462         }
463
464         cli = &obd->u.cli;
465         imp = cli->cl_import;
466
467         down_write(&cli->cl_sem);
468         CDEBUG(D_INFO, "disconnect %s - %d\n", obd->obd_name,
469                cli->cl_conn_count);
470
471         if (!cli->cl_conn_count) {
472                 CERROR("disconnecting disconnected device (%s)\n",
473                        obd->obd_name);
474                 GOTO(out_disconnect, rc = -EINVAL);
475         }
476
477         cli->cl_conn_count--;
478         if (cli->cl_conn_count)
479                 GOTO(out_disconnect, rc = 0);
480
481         /* Mark import deactivated now, so we don't try to reconnect if any
482          * of the cleanup RPCs fails (e.g. ldlm cancel, etc).  We don't
483          * fully deactivate the import, or that would drop all requests. */
484         spin_lock(&imp->imp_lock);
485         imp->imp_deactive = 1;
486         spin_unlock(&imp->imp_lock);
487
488         /* Some non-replayable imports (MDS's OSCs) are pinged, so just
489          * delete it regardless.  (It's safe to delete an import that was
490          * never added.) */
491         (void)ptlrpc_pinger_del_import(imp);
492
493         if (obd->obd_namespace != NULL) {
494                 /* obd_force == local only */
495                 ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
496                                        obd->obd_force ? LDLM_FL_LOCAL_ONLY:0,
497                                        NULL);
498                 ldlm_namespace_free_prior(obd->obd_namespace, imp,
499                                           obd->obd_force);
500         }
501
502         rc = ptlrpc_disconnect_import(imp, 0);
503
504         ptlrpc_invalidate_import(imp);
505
506         if (imp->imp_rq_pool) {
507                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
508                 imp->imp_rq_pool = NULL;
509         }
510         class_destroy_import(imp);
511         cli->cl_import = NULL;
512
513         EXIT;
514
515  out_disconnect:
516         /* use server style - class_disconnect should be always called for
517          * o_disconnect */
518         err = class_disconnect(exp);
519         if (!rc && err)
520                 rc = err;
521         up_write(&cli->cl_sem);
522
523         RETURN(rc);
524 }
525
526 int server_disconnect_export(struct obd_export *exp)
527 {
528         int rc;
529         ENTRY;
530
531         /* Disconnect early so that clients can't keep using export */
532         rc = class_disconnect(exp);
533
534         /* close import for avoid sending any requests */
535         if (exp->exp_imp_reverse)
536                 ptlrpc_cleanup_imp(exp->exp_imp_reverse);
537
538         if (exp->exp_obd->obd_namespace != NULL)
539                 ldlm_cancel_locks_for_export(exp);
540
541         /* complete all outstanding replies */
542         spin_lock(&exp->exp_lock);
543         while (!list_empty(&exp->exp_outstanding_replies)) {
544                 struct ptlrpc_reply_state *rs =
545                         list_entry(exp->exp_outstanding_replies.next,
546                                    struct ptlrpc_reply_state, rs_exp_list);
547                 struct ptlrpc_service *svc = rs->rs_service;
548
549                 spin_lock(&svc->srv_lock);
550                 list_del_init(&rs->rs_exp_list);
551                 ptlrpc_schedule_difficult_reply(rs);
552                 spin_unlock(&svc->srv_lock);
553         }
554         spin_unlock(&exp->exp_lock);
555
556         RETURN(rc);
557 }
558
559 /* --------------------------------------------------------------------------
560  * from old lib/target.c
561  * -------------------------------------------------------------------------- */
562
563 static int target_handle_reconnect(struct lustre_handle *conn,
564                                    struct obd_export *exp,
565                                    struct obd_uuid *cluuid)
566 {
567         ENTRY;
568         if (exp->exp_connection && exp->exp_imp_reverse) {
569                 struct lustre_handle *hdl;
570                 hdl = &exp->exp_imp_reverse->imp_remote_handle;
571                 /* Might be a re-connect after a partition. */
572                 if (!memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
573                         CWARN("%s: %s reconnecting\n", exp->exp_obd->obd_name,
574                               cluuid->uuid);
575                         conn->cookie = exp->exp_handle.h_cookie;
576                         /* target_handle_connect() treats EALREADY and
577                          * -EALREADY differently.  EALREADY means we are
578                          * doing a valid reconnect from the same client. */
579                         RETURN(EALREADY);
580                 } else {
581                         CERROR("%s reconnecting from %s, "
582                                "handle mismatch (ours "LPX64", theirs "
583                                LPX64")\n", cluuid->uuid,
584                                exp->exp_connection->c_remote_uuid.uuid,
585                                hdl->cookie, conn->cookie);
586                         memset(conn, 0, sizeof *conn);
587                         /* target_handle_connect() treats EALREADY and
588                          * -EALREADY differently.  -EALREADY is an error
589                          * (same UUID, different handle). */
590                         RETURN(-EALREADY);
591                 }
592         }
593
594         conn->cookie = exp->exp_handle.h_cookie;
595         CDEBUG(D_HA, "connect export for UUID '%s' at %p, cookie "LPX64"\n",
596                cluuid->uuid, exp, conn->cookie);
597         RETURN(0);
598 }
599
600 void target_client_add_cb(struct obd_device *obd, __u64 transno, void *cb_data,
601                           int error)
602 {
603         struct obd_export *exp = cb_data;
604
605         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
606                obd->obd_name, exp->exp_client_uuid.uuid);
607
608         spin_lock(&exp->exp_lock);
609         exp->exp_need_sync = 0;
610         spin_unlock(&exp->exp_lock);
611 }
612 EXPORT_SYMBOL(target_client_add_cb);
613
614 static void
615 target_start_and_reset_recovery_timer(struct obd_device *obd,
616                                       svc_handler_t handler,
617                                       struct ptlrpc_request *req,
618                                       int new_client);
619 void target_stop_recovery(void *, int);
620 static void reset_recovery_timer(struct obd_device *obd, int duration,
621                                  int extend);
622 int target_recovery_check_and_stop(struct obd_device *obd)
623 {
624         int abort_recovery = 0;
625
626         if (obd->obd_stopping || !obd->obd_recovering)
627                 return 1;
628
629         spin_lock_bh(&obd->obd_processing_task_lock);
630         abort_recovery = obd->obd_abort_recovery;
631         obd->obd_abort_recovery = 0;
632         spin_unlock_bh(&obd->obd_processing_task_lock);
633         if (!abort_recovery)
634                 return 0;
635         /** check if fs version-capable */
636         if (target_fs_version_capable(obd)) {
637                 class_handle_stale_exports(obd);
638         } else {
639                 CWARN("Versions are not supported by ldiskfs, VBR is OFF\n");
640                 class_disconnect_stale_exports(obd, exp_flags_from_obd(obd));
641         }
642         /* VBR: no clients are remained to replay, stop recovery */
643         spin_lock_bh(&obd->obd_processing_task_lock);
644         if (obd->obd_recovering && obd->obd_recoverable_clients == 0) {
645                 spin_unlock_bh(&obd->obd_processing_task_lock);
646                 target_stop_recovery(obd, 0);
647                 return 1;
648         }
649         /* always check versions now */
650         obd->obd_version_recov = 1;
651         cfs_waitq_signal(&obd->obd_next_transno_waitq);
652         spin_unlock_bh(&obd->obd_processing_task_lock);
653         /* reset timer, recovery will proceed with versions now */
654         reset_recovery_timer(obd, OBD_RECOVERY_TIME_SOFT, 1);
655         return 0;
656 }
657 EXPORT_SYMBOL(target_recovery_check_and_stop);
658
659 int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
660 {
661         struct obd_device *target, *targref = NULL;
662         struct obd_export *export = NULL;
663         struct obd_import *revimp;
664         struct lustre_handle conn;
665         struct obd_uuid tgtuuid;
666         struct obd_uuid cluuid;
667         struct obd_uuid remote_uuid;
668         char *str, *tmp;
669         int rc = 0;
670         struct obd_connect_data *data;
671         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*data) };
672         lnet_nid_t *client_nid = NULL;
673         int mds_conn = 0;
674         ENTRY;
675
676         OBD_RACE(OBD_FAIL_TGT_CONN_RACE);
677
678         lustre_set_req_swabbed(req, REQ_REC_OFF);
679         str = lustre_msg_string(req->rq_reqmsg, REQ_REC_OFF, sizeof(tgtuuid)-1);
680         if (str == NULL) {
681                 DEBUG_REQ(D_ERROR, req, "bad target UUID for connect");
682                 GOTO(out, rc = -EINVAL);
683         }
684
685         obd_str2uuid (&tgtuuid, str);
686         target = class_uuid2obd(&tgtuuid);
687         /* COMPAT_146 */
688         /* old (pre 1.6) lustre_process_log tries to connect to mdsname
689            (eg. mdsA) instead of uuid. */
690         if (!target) {
691                 snprintf((char *)tgtuuid.uuid, sizeof(tgtuuid), "%s_UUID", str);
692                 target = class_uuid2obd(&tgtuuid);
693         }
694         if (!target)
695                 target = class_name2obd(str);
696         /* end COMPAT_146 */
697
698         if (!target || target->obd_stopping || !target->obd_set_up) {
699                 LCONSOLE_ERROR_MSG(0x137, "UUID '%s' is not available "
700                                    " for connect (%s)\n", str,
701                                    !target ? "no target" :
702                                    (target->obd_stopping ? "stopping" :
703                                    "not set up"));
704                 GOTO(out, rc = -ENODEV);
705         }
706
707         if (target->obd_no_conn) {
708                 LCONSOLE_WARN("%s: temporarily refusing client connection "
709                               "from %s\n", target->obd_name,
710                               libcfs_nid2str(req->rq_peer.nid));
711                 GOTO(out, rc = -EAGAIN);
712         }
713
714         /* Make sure the target isn't cleaned up while we're here. Yes,
715            there's still a race between the above check and our incref here.
716            Really, class_uuid2obd should take the ref. */
717         targref = class_incref(target);
718
719         lustre_set_req_swabbed(req, REQ_REC_OFF + 1);
720         str = lustre_msg_string(req->rq_reqmsg, REQ_REC_OFF + 1,
721                                 sizeof(cluuid) - 1);
722         if (str == NULL) {
723                 DEBUG_REQ(D_ERROR, req, "bad client UUID for connect");
724                 GOTO(out, rc = -EINVAL);
725         }
726
727         obd_str2uuid (&cluuid, str);
728
729         /* XXX extract a nettype and format accordingly */
730         switch (sizeof(lnet_nid_t)) {
731                 /* NB the casts only avoid compiler warnings */
732         case 8:
733                 snprintf(remote_uuid.uuid, sizeof remote_uuid,
734                          "NET_"LPX64"_UUID", (__u64)req->rq_peer.nid);
735                 break;
736         case 4:
737                 snprintf(remote_uuid.uuid, sizeof remote_uuid,
738                          "NET_%x_UUID", (__u32)req->rq_peer.nid);
739                 break;
740         default:
741                 LBUG();
742         }
743
744         target_recovery_check_and_stop(target);
745
746         tmp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, sizeof conn);
747         if (tmp == NULL)
748                 GOTO(out, rc = -EPROTO);
749
750         memcpy(&conn, tmp, sizeof conn);
751
752         data = lustre_swab_reqbuf(req, REQ_REC_OFF + 3, sizeof(*data),
753                                   lustre_swab_connect);
754         rc = lustre_pack_reply(req, 2, size, NULL);
755         if (rc)
756                 GOTO(out, rc);
757
758         if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_LIBCLIENT) {
759                 if (!data) {
760                         DEBUG_REQ(D_WARNING, req, "Refusing old (unversioned) "
761                                   "libclient connection attempt");
762                         GOTO(out, rc = -EPROTO);
763                 } else if (data->ocd_version < LUSTRE_VERSION_CODE -
764                                                LUSTRE_VERSION_ALLOWED_OFFSET ||
765                            data->ocd_version > LUSTRE_VERSION_CODE +
766                                                LUSTRE_VERSION_ALLOWED_OFFSET) {
767                         DEBUG_REQ(D_WARNING, req, "Refusing %s (%d.%d.%d.%d) "
768                                   "libclient connection attempt",
769                                   data->ocd_version < LUSTRE_VERSION_CODE ?
770                                   "old" : "new",
771                                   OBD_OCD_VERSION_MAJOR(data->ocd_version),
772                                   OBD_OCD_VERSION_MINOR(data->ocd_version),
773                                   OBD_OCD_VERSION_PATCH(data->ocd_version),
774                                   OBD_OCD_VERSION_FIX(data->ocd_version));
775                         data = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
776                                               offsetof(typeof(*data),
777                                                        ocd_version) +
778                                               sizeof(data->ocd_version));
779                         if (data) {
780                                 data->ocd_connect_flags = OBD_CONNECT_VERSION;
781                                 data->ocd_version = LUSTRE_VERSION_CODE;
782                         }
783                         GOTO(out, rc = -EPROTO);
784                 }
785         }
786
787         if ((lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_INITIAL) &&
788             (data->ocd_connect_flags & OBD_CONNECT_MDS))
789                 mds_conn = 1;
790
791         /* lctl gets a backstage, all-access pass. */
792         if (obd_uuid_equals(&cluuid, &target->obd_uuid))
793                 goto dont_check_exports;
794
795         export = lustre_hash_lookup(target->obd_uuid_hash, &cluuid);
796         if (!export)
797                 goto no_export;
798
799         /* we've found an export in the hash */
800         if (export->exp_connecting) {
801                 /* bug 9635, et. al. */
802                 CWARN("%s: exp %p already connecting\n",
803                       export->exp_obd->obd_name, export);
804                 class_export_put(export);
805                 export = NULL;
806                 rc = -EALREADY;
807         } else if (mds_conn && export->exp_connection) {
808                 if (req->rq_peer.nid != export->exp_connection->c_peer.nid)
809                         /* mds reconnected after failover */
810                         CWARN("%s: received MDS connection from NID %s,"
811                               " removing former export from NID %s\n",
812                             target->obd_name, libcfs_nid2str(req->rq_peer.nid),
813                             libcfs_nid2str(export->exp_connection->c_peer.nid));
814                 else
815                         /* new mds connection from the same nid */
816                         CWARN("%s: received new MDS connection from NID %s,"
817                               " removing former export from same NID\n",
818                             target->obd_name, libcfs_nid2str(req->rq_peer.nid));
819                 class_fail_export(export);
820                 class_export_put(export);
821                 export = NULL;
822                 rc = 0;
823         } else if (export->exp_connection &&
824                    req->rq_peer.nid != export->exp_connection->c_peer.nid &&
825                    (lustre_msg_get_op_flags(req->rq_reqmsg) &
826                     MSG_CONNECT_INITIAL)) {
827                 CWARN("%s: cookie %s seen on new NID %s when "
828                       "existing NID %s is already connected\n",
829                       target->obd_name, cluuid.uuid,
830                       libcfs_nid2str(req->rq_peer.nid),
831                       libcfs_nid2str(export->exp_connection->c_peer.nid));
832                 rc = -EALREADY;
833                 class_export_put(export);
834                 export = NULL;
835         } else if (export->exp_failed) { /* bug 11327 */
836                 CDEBUG(D_HA, "%s: exp %p evict in progress - new cookie needed "
837                       "for connect\n", export->exp_obd->obd_name, export);
838                 class_export_put(export);
839                 export = NULL;
840                 rc = -ENODEV;
841         } else if (export->exp_delayed &&
842                    !(data && data->ocd_connect_flags & OBD_CONNECT_VBR)) {
843                 class_fail_export(export);
844                 class_export_put(export);
845                 export = NULL;
846                 GOTO(out, rc = -ENODEV);
847         } else {
848                 spin_lock(&export->exp_lock);
849                 export->exp_connecting = 1;
850                 spin_unlock(&export->exp_lock);
851                 class_export_put(export);
852                 LASSERT(export->exp_obd == target);
853
854                 rc = target_handle_reconnect(&conn, export, &cluuid);
855         }
856
857         /* If we found an export, we already unlocked. */
858         if (!export) {
859 no_export:
860                 OBD_FAIL_TIMEOUT(OBD_FAIL_TGT_DELAY_CONNECT, 2 * obd_timeout);
861         } else if (req->rq_export == NULL &&
862                    atomic_read(&export->exp_rpc_count) > 0) {
863                 CWARN("%s: refuse connection from %s/%s to 0x%p; still busy "
864                       "with %d references\n", target->obd_name, cluuid.uuid,
865                       libcfs_nid2str(req->rq_peer.nid),
866                       export, atomic_read(&export->exp_refcount));
867                 GOTO(out, rc = -EBUSY);
868         } else if (req->rq_export != NULL &&
869                    atomic_read(&export->exp_rpc_count) > 1) {
870                 /* the current connect rpc has increased exp_rpc_count */
871                 CWARN("%s: refuse reconnection from %s@%s to 0x%p; still busy "
872                       "with %d active RPCs\n", target->obd_name, cluuid.uuid,
873                       libcfs_nid2str(req->rq_peer.nid),
874                       export, atomic_read(&export->exp_rpc_count) - 1);
875                 spin_lock(&export->exp_lock);
876                 if (req->rq_export->exp_conn_cnt <
877                     lustre_msg_get_conn_cnt(req->rq_reqmsg))
878                         /* try to abort active requests */
879                         req->rq_export->exp_abort_active_req = 1;
880                 spin_unlock(&export->exp_lock);
881                 GOTO(out, rc = -EBUSY);
882         } else if (lustre_msg_get_conn_cnt(req->rq_reqmsg) == 1) {
883                 CERROR("%s: NID %s (%s) reconnected with 1 conn_cnt; "
884                        "cookies not random?\n", target->obd_name,
885                        libcfs_nid2str(req->rq_peer.nid), cluuid.uuid);
886                 GOTO(out, rc = -EALREADY);
887         } else if (export->exp_delayed && target->obd_recovering) {
888                 /* VBR: don't allow delayed connection during recovery */
889                 CWARN("%s: NID %s (%s) export was already marked as delayed "
890                       "and will wait for end of recovery\n", target->obd_name,
891                        libcfs_nid2str(req->rq_peer.nid), cluuid.uuid);
892                 GOTO(out, rc = -EBUSY);
893         } else {
894                 OBD_FAIL_TIMEOUT(OBD_FAIL_TGT_DELAY_RECONNECT, 2 * obd_timeout);
895         }
896
897         if (rc < 0)
898                 GOTO(out, rc);
899
900         /* Tell the client if we're in recovery. */
901         if (target->obd_recovering) {
902                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
903                 /* If this is the first time a client connects,
904                    reset the recovery timer */
905                 if (rc == 0)
906                         target_start_and_reset_recovery_timer(target, handler,
907                                                               req, !export);
908         }
909
910         /* We want to handle EALREADY but *not* -EALREADY from
911          * target_handle_reconnect(), return reconnection state in a flag */
912         if (rc == EALREADY) {
913                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
914                 rc = 0;
915         } else {
916                 LASSERT(rc == 0);
917         }
918
919         /* Tell the client if we support replayable requests */
920         if (target->obd_replayable)
921                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
922         client_nid = &req->rq_peer.nid;
923
924         /* VBR: for delayed connections we start recovery */
925         if (export && export->exp_delayed && !export->exp_in_recovery) {
926                 LASSERT(!target->obd_recovering);
927                 LASSERT(data && data->ocd_connect_flags & OBD_CONNECT_VBR);
928                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_DELAYED |
929                                         MSG_CONNECT_RECOVERING);
930                 spin_lock_bh(&target->obd_processing_task_lock);
931                 target->obd_version_recov = 1;
932                 spin_unlock_bh(&target->obd_processing_task_lock);
933                 target_start_and_reset_recovery_timer(target, handler, req, 1);
934         }
935
936         if (export == NULL) {
937                 if (target->obd_recovering) {
938                         CERROR("%s: denying connection for new client %s (%s): "
939                                "%d clients in recovery for %lds\n",
940                                target->obd_name,
941                                libcfs_nid2str(req->rq_peer.nid), cluuid.uuid,
942                                target->obd_recoverable_clients,
943                                cfs_duration_sec(cfs_time_sub(cfs_timer_deadline(&target->obd_recovery_timer),
944                                                              cfs_time_current())));
945                         rc = -EBUSY;
946                 } else {
947  dont_check_exports:
948                         rc = obd_connect(&conn, target, &cluuid, data,
949                                          client_nid);
950                 }
951         } else {
952                 rc = obd_reconnect(export, target, &cluuid, data, client_nid);
953         }
954
955         if (rc)
956                 GOTO(out, rc);
957
958         /* Return only the parts of obd_connect_data that we understand, so the
959          * client knows that we don't understand the rest. */
960         if (data)
961                 memcpy(lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
962                                       sizeof(*data)),
963                        data, sizeof(*data));
964
965         /* If all else goes well, this is our RPC return code. */
966         req->rq_status = 0;
967
968         lustre_msg_set_handle(req->rq_repmsg, &conn);
969
970         /* ownership of this export ref transfers to the request AFTER we
971          * drop any previous reference the request had, but we don't want
972          * that to go to zero before we get our new export reference. */
973         export = class_conn2export(&conn);
974         if (!export) {
975                 DEBUG_REQ(D_ERROR, req, "Missing export!");
976                 GOTO(out, rc = -ENODEV);
977         }
978
979         /* If the client and the server are the same node, we will already
980          * have an export that really points to the client's DLM export,
981          * because we have a shared handles table.
982          *
983          * XXX this will go away when shaver stops sending the "connect" handle
984          * in the real "remote handle" field of the request --phik 24 Apr 2003
985          */
986         if (req->rq_export != NULL)
987                 class_export_put(req->rq_export);
988
989         req->rq_export = export;
990
991         spin_lock(&export->exp_lock);
992         if (export->exp_conn_cnt >= lustre_msg_get_conn_cnt(req->rq_reqmsg)) {
993                 CERROR("%s: %s already connected at higher conn_cnt: %d > %d\n",
994                        cluuid.uuid, libcfs_nid2str(req->rq_peer.nid),
995                        export->exp_conn_cnt,
996                        lustre_msg_get_conn_cnt(req->rq_reqmsg));
997
998                 spin_unlock(&export->exp_lock);
999                 GOTO(out, rc = -EALREADY);
1000         }
1001         LASSERT(lustre_msg_get_conn_cnt(req->rq_reqmsg) > 0);
1002         export->exp_conn_cnt = lustre_msg_get_conn_cnt(req->rq_reqmsg);
1003         export->exp_abort_active_req = 0;
1004
1005         /* request from liblustre?  Don't evict it for not pinging. */
1006         if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_LIBCLIENT) {
1007                 export->exp_libclient = 1;
1008                 spin_unlock(&export->exp_lock);
1009
1010                 spin_lock(&target->obd_dev_lock);
1011                 list_del_init(&export->exp_obd_chain_timed);
1012                 spin_unlock(&target->obd_dev_lock);
1013         } else {
1014                 spin_unlock(&export->exp_lock);
1015         }
1016
1017         if (export->exp_connection != NULL) {
1018                 /* Check to see if connection came from another NID */
1019                 if ((export->exp_connection->c_peer.nid != req->rq_peer.nid) &&
1020                     !hlist_unhashed(&export->exp_nid_hash))
1021                         lustre_hash_del(export->exp_obd->obd_nid_hash,
1022                                         &export->exp_connection->c_peer.nid,
1023                                         &export->exp_nid_hash);
1024
1025                 ptlrpc_connection_put(export->exp_connection);
1026         }
1027
1028         export->exp_connection = ptlrpc_connection_get(req->rq_peer,
1029                                                        req->rq_self,
1030                                                        &remote_uuid);
1031
1032         if (hlist_unhashed(&export->exp_nid_hash)) {
1033                 lustre_hash_add(export->exp_obd->obd_nid_hash,
1034                                 &export->exp_connection->c_peer.nid,
1035                                 &export->exp_nid_hash);
1036         }
1037
1038         if (lustre_msg_get_op_flags(req->rq_repmsg) & MSG_CONNECT_RECONNECT) {
1039                 revimp = class_import_get(export->exp_imp_reverse);
1040                 ptlrpc_connection_put(revimp->imp_connection);
1041                 revimp->imp_connection = NULL;
1042                 GOTO(set_flags, rc = 0);
1043         }
1044
1045         if (target->obd_recovering && !export->exp_in_recovery) {
1046                 spin_lock(&export->exp_lock);
1047                 export->exp_in_recovery = 1;
1048                 spin_unlock(&export->exp_lock);
1049                 target->obd_connected_clients++;
1050         }
1051         memcpy(&conn,
1052                lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, sizeof conn),
1053                sizeof conn);
1054
1055         if (export->exp_imp_reverse != NULL)
1056                 class_destroy_import(export->exp_imp_reverse);
1057         revimp = export->exp_imp_reverse = class_new_import(target);
1058         revimp->imp_client = &export->exp_obd->obd_ldlm_client;
1059         revimp->imp_remote_handle = conn;
1060         revimp->imp_dlm_fake = 1;
1061         revimp->imp_state = LUSTRE_IMP_FULL;
1062
1063 set_flags:
1064         revimp->imp_connection = ptlrpc_connection_addref(export->exp_connection);
1065         if (req->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V1 &&
1066             lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_NEXT_VER) {
1067                 revimp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1068                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_NEXT_VER);
1069         } else {
1070                 /* unknown versions will be caught in
1071                  * ptlrpc_handle_server_req_in->lustre_unpack_msg() */
1072                 revimp->imp_msg_magic = req->rq_reqmsg->lm_magic;
1073         }
1074
1075         if (revimp->imp_msg_magic != LUSTRE_MSG_MAGIC_V1) {
1076                 if (export->exp_connect_flags & OBD_CONNECT_AT)
1077                         revimp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
1078                 else
1079                         revimp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
1080         }
1081
1082         class_import_put(revimp);
1083 out:
1084         if (export) {
1085                 spin_lock(&export->exp_lock);
1086                 export->exp_connecting = 0;
1087                 spin_unlock(&export->exp_lock);
1088         }
1089         if (targref)
1090                 class_decref(targref);
1091         if (rc)
1092                 req->rq_status = rc;
1093         RETURN(rc);
1094 }
1095
1096 int target_handle_disconnect(struct ptlrpc_request *req)
1097 {
1098         int rc;
1099         ENTRY;
1100
1101         rc = lustre_pack_reply(req, 1, NULL, NULL);
1102         if (rc)
1103                 RETURN(rc);
1104
1105         /* keep the rq_export around so we can send the reply */
1106         req->rq_status = obd_disconnect(class_export_get(req->rq_export));
1107         RETURN(0);
1108 }
1109
1110 void target_destroy_export(struct obd_export *exp)
1111 {
1112         /* exports created from last_rcvd data, and "fake"
1113            exports created by lctl don't have an import */
1114         if (exp->exp_imp_reverse != NULL)
1115                 class_destroy_import(exp->exp_imp_reverse);
1116
1117         /* We cancel locks at disconnect time, but this will catch any locks
1118          * granted in a race with recovery-induced disconnect. */
1119         if (exp->exp_obd->obd_namespace != NULL)
1120                 ldlm_cancel_locks_for_export(exp);
1121 }
1122
1123 /*
1124  * Recovery functions
1125  */
1126
1127 static int target_exp_enqueue_req_replay(struct ptlrpc_request *req)
1128 {
1129         __u64                  transno = lustre_msg_get_transno(req->rq_reqmsg);
1130         struct obd_export     *exp = req->rq_export;
1131         struct ptlrpc_request *reqiter;
1132         int                    dup = 0;
1133
1134         LASSERT(exp);
1135
1136         spin_lock(&exp->exp_lock);
1137         list_for_each_entry(reqiter, &exp->exp_req_replay_queue,
1138                             rq_replay_list) {
1139                 if (lustre_msg_get_transno(reqiter->rq_reqmsg) == transno) {
1140                         dup = 1;
1141                         break;
1142                 }
1143         }
1144
1145         if (dup) {
1146                 /* we expect it with RESENT and REPLAY flags */
1147                 if ((lustre_msg_get_flags(req->rq_reqmsg) &
1148                      (MSG_RESENT | MSG_REPLAY)) != (MSG_RESENT | MSG_REPLAY))
1149                         CERROR("invalid flags %x of resent replay\n",
1150                                lustre_msg_get_flags(req->rq_reqmsg));
1151         } else {
1152                 list_add_tail(&req->rq_replay_list, &exp->exp_req_replay_queue);
1153         }
1154
1155         spin_unlock(&exp->exp_lock);
1156         return dup;
1157 }
1158
1159 static void target_exp_dequeue_req_replay(struct ptlrpc_request *req)
1160 {
1161         LASSERT(!list_empty(&req->rq_replay_list));
1162         LASSERT(req->rq_export);
1163
1164         spin_lock(&req->rq_export->exp_lock);
1165         list_del_init(&req->rq_replay_list);
1166         spin_unlock(&req->rq_export->exp_lock);
1167 }
1168
1169 static void target_request_copy_get(struct ptlrpc_request *req)
1170 {
1171         /* mark that request is in recovery queue, so request handler will not
1172          * drop rpc count in export, bug 19870*/
1173         LASSERT(!req->rq_copy_queued);
1174         spin_lock(&req->rq_lock);
1175         req->rq_copy_queued = 1;
1176         spin_unlock(&req->rq_lock);
1177         /* increase refcount to keep request in queue */
1178         atomic_inc(&req->rq_refcount);
1179         /* release service thread while request is queued
1180          * we are moving the request from active processing
1181          * to waiting on the replay queue */
1182         ptlrpc_server_active_request_dec(req);
1183 }
1184
1185 static void target_request_copy_put(struct ptlrpc_request *req)
1186 {
1187         LASSERTF(list_empty(&req->rq_replay_list), "next: %p, prev: %p\n",
1188                  req->rq_replay_list.next, req->rq_replay_list.prev);
1189         /* class_export_rpc_get was done before handling request,
1190          * drop it early to allow new requests, see bug 19870.
1191          */
1192         LASSERT(req->rq_copy_queued);
1193         class_export_rpc_put(req->rq_export);
1194         /* ptlrpc_server_drop_request() assumes the request is active */
1195         ptlrpc_server_active_request_inc(req);
1196         ptlrpc_server_drop_request(req);
1197 }
1198
1199 static void target_send_delayed_replies(struct obd_device *obd)
1200 {
1201         int max_clients = obd->obd_max_recoverable_clients;
1202         struct ptlrpc_request *req, *tmp;
1203         time_t elapsed_time = max_t(time_t, 1, cfs_time_current_sec() -
1204                                     obd->obd_recovery_start);
1205
1206         LCONSOLE_INFO("%s: Recovery period over after %d:%.02d, of %d clients "
1207                       "%d recovered and %d %s evicted.\n", obd->obd_name,
1208                       (int)elapsed_time/60, (int)elapsed_time%60, max_clients,
1209                       obd->obd_connected_clients,
1210                       obd->obd_stale_clients,
1211                       obd->obd_stale_clients == 1 ? "was" : "were");
1212
1213         LCONSOLE_INFO("%s: sending delayed replies to recovered clients\n",
1214                       obd->obd_name);
1215
1216         list_for_each_entry_safe(req, tmp, &obd->obd_delayed_reply_queue,
1217                                  rq_list) {
1218                 list_del_init(&req->rq_list);
1219                 DEBUG_REQ(D_HA, req, "delayed:");
1220                 ptlrpc_reply(req);
1221                 target_request_copy_put(req);
1222         }
1223         obd->obd_recovery_end = cfs_time_current_sec();
1224 }
1225
1226 static void target_finish_recovery(struct obd_device *obd)
1227 {
1228         OBD_RACE(OBD_FAIL_TGT_REPLAY_DELAY);
1229
1230         ldlm_reprocess_all_ns(obd->obd_namespace);
1231         spin_lock_bh(&obd->obd_processing_task_lock);
1232         if (list_empty(&obd->obd_recovery_queue)) {
1233                 obd->obd_recovery_thread = NULL;
1234                 obd->obd_processing_task = 0;
1235         } else {
1236                 spin_unlock_bh(&obd->obd_processing_task_lock);
1237                 CERROR("%s: Recovery queue isn't empty\n", obd->obd_name);
1238                 LBUG();
1239         }
1240         spin_unlock_bh(&obd->obd_processing_task_lock);
1241                 ;
1242         /* when recovery finished, cleanup orphans on mds and ost */
1243         if (OBT(obd) && OBP(obd, postrecov)) {
1244                 int rc = OBP(obd, postrecov)(obd);
1245                 if (rc < 0)
1246                         LCONSOLE_WARN("%s: Post recovery failed, rc %d\n",
1247                                       obd->obd_name, rc);
1248         }
1249         target_send_delayed_replies(obd);
1250 }
1251
1252 static void abort_recovery_queue(struct obd_device *obd)
1253 {
1254         struct ptlrpc_request *req, *n;
1255         struct list_head abort_list;
1256         int rc;
1257
1258         CFS_INIT_LIST_HEAD(&abort_list);
1259         spin_lock_bh(&obd->obd_processing_task_lock);
1260         list_splice_init(&obd->obd_recovery_queue, &abort_list);
1261         spin_unlock_bh(&obd->obd_processing_task_lock);
1262         /* process abort list unlocked */
1263         list_for_each_entry_safe(req, n, &abort_list, rq_list) {
1264                 target_exp_dequeue_req_replay(req);
1265                 list_del_init(&req->rq_list);
1266                 DEBUG_REQ(D_ERROR, req, "%s: aborted:", obd->obd_name);
1267                 req->rq_status = -ENOTCONN;
1268                 req->rq_type = PTL_RPC_MSG_ERR;
1269                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1270                 if (rc == 0)
1271                         ptlrpc_reply(req);
1272                 else
1273                         DEBUG_REQ(D_ERROR, req,
1274                                   "packing failed for abort-reply; skipping");
1275                 target_request_copy_put(req);
1276         }
1277 }
1278
1279 /* Called from a cleanup function if the device is being cleaned up
1280    forcefully.  The exports should all have been disconnected already,
1281    the only thing left to do is
1282      - clear the recovery flags
1283      - cancel the timer
1284      - free queued requests and replies, but don't send replies
1285    Because the obd_stopping flag is set, no new requests should be received.
1286
1287 */
1288 void target_cleanup_recovery(struct obd_device *obd)
1289 {
1290         struct list_head *tmp, *n;
1291         struct ptlrpc_request *req;
1292         struct list_head clean_list;
1293         ENTRY;
1294
1295         LASSERT(obd->obd_stopping);
1296
1297         spin_lock_bh(&obd->obd_processing_task_lock);
1298         if (!obd->obd_recovering) {
1299                 spin_unlock_bh(&obd->obd_processing_task_lock);
1300                 EXIT;
1301                 return;
1302         }
1303         obd->obd_recovering = obd->obd_abort_recovery = 0;
1304         target_cancel_recovery_timer(obd);
1305         spin_unlock_bh(&obd->obd_processing_task_lock);
1306
1307         list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
1308                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
1309                 list_del(&req->rq_list);
1310                 target_request_copy_put(req);
1311         }
1312
1313         CFS_INIT_LIST_HEAD(&clean_list);
1314         spin_lock_bh(&obd->obd_processing_task_lock);
1315         list_splice_init(&obd->obd_recovery_queue, &clean_list);
1316         cfs_waitq_signal(&obd->obd_next_transno_waitq);
1317         spin_unlock_bh(&obd->obd_processing_task_lock);
1318         list_for_each_safe(tmp, n, &clean_list) {
1319                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
1320                 target_exp_dequeue_req_replay(req);
1321                 list_del_init(&req->rq_list);
1322                 target_request_copy_put(req);
1323         }
1324         EXIT;
1325 }
1326
1327 void target_stop_recovery(void *data, int abort)
1328 {
1329         struct obd_device *obd = data;
1330         enum obd_option flags;
1331         ENTRY;
1332
1333         spin_lock_bh(&obd->obd_processing_task_lock);
1334         if (!obd->obd_recovering) {
1335                 spin_unlock_bh(&obd->obd_processing_task_lock);
1336                 EXIT;
1337                 return;
1338         }
1339         flags = exp_flags_from_obd(obd) | OBD_OPT_ABORT_RECOV;
1340         obd->obd_recovering = 0;
1341         obd->obd_abort_recovery = 0;
1342         obd->obd_processing_task = 0;
1343         if (abort == 0)
1344                 LASSERT(obd->obd_recoverable_clients == 0);
1345
1346         target_cancel_recovery_timer(obd);
1347         spin_unlock_bh(&obd->obd_processing_task_lock);
1348
1349         if (abort) {
1350                 LCONSOLE_WARN("%s: recovery is aborted by administrative "
1351                               "request; %d clients are not recovered "
1352                               "(%d clients did)\n", obd->obd_name,
1353                               obd->obd_recoverable_clients,
1354                               obd->obd_connected_clients);
1355                 class_disconnect_stale_exports(obd, flags);
1356         }
1357         abort_recovery_queue(obd);
1358         target_finish_recovery(obd);
1359         CDEBUG(D_HA, "%s: recovery complete\n", obd_uuid2str(&obd->obd_uuid));
1360         EXIT;
1361 }
1362
1363 void target_abort_recovery(void *data)
1364 {
1365         target_stop_recovery(data, 1);
1366 }
1367
1368 static void reset_recovery_timer(struct obd_device *, int, int);
1369 static void target_recovery_expired(unsigned long castmeharder)
1370 {
1371         struct obd_device *obd = (struct obd_device *)castmeharder;
1372         CDEBUG(D_HA, "%s: recovery period over; %d clients never reconnected "
1373                "after %lds (%d clients did)\n", obd->obd_name,
1374                obd->obd_recoverable_clients,
1375                cfs_time_current_sec() - obd->obd_recovery_start,
1376                obd->obd_connected_clients);
1377
1378         spin_lock_bh(&obd->obd_processing_task_lock);
1379         obd->obd_abort_recovery = 1;
1380         cfs_waitq_signal(&obd->obd_next_transno_waitq);
1381         spin_unlock_bh(&obd->obd_processing_task_lock);
1382
1383         /* bug 18948:
1384          * The recovery timer expired and target_check_and_stop_recovery()
1385          * must be called.  We cannot call it directly because we are in
1386          * interrupt context, so we need to wake up another thread to call it.
1387          * This may happen if there are obd->obd_next_transno_waitq waiters,
1388          * or if we happen to handle a connect request.  However, we cannot
1389          * count on either of those things so we wake up the ping evictor
1390          * and leverage it's context to complete recovery.
1391          *
1392          * Note: HEAD has a separate recovery thread and handle this.
1393          */
1394         spin_lock(&obd->obd_dev_lock);
1395         ping_evictor_wake(obd->obd_self_export);
1396         spin_unlock(&obd->obd_dev_lock);
1397 }
1398
1399 /* obd_processing_task_lock should be held */
1400 void target_cancel_recovery_timer(struct obd_device *obd)
1401 {
1402         CDEBUG(D_HA, "%s: cancel recovery timer\n", obd->obd_name);
1403         cfs_timer_disarm(&obd->obd_recovery_timer);
1404 }
1405
1406 /* extend = 1 means require at least "duration" seconds left in the timer,
1407    extend = 0 means set the total duration (start_recovery_timer) */
1408 static void reset_recovery_timer(struct obd_device *obd, int duration,
1409                                  int extend)
1410 {
1411         cfs_time_t now = cfs_time_current_sec();
1412         cfs_duration_t left;
1413
1414         spin_lock_bh(&obd->obd_processing_task_lock);
1415         if (!obd->obd_recovering) {
1416                 spin_unlock_bh(&obd->obd_processing_task_lock);
1417                 return;
1418         }
1419
1420         left = cfs_time_sub(obd->obd_recovery_end, now);
1421
1422         if (extend && (duration > left))
1423                 obd->obd_recovery_timeout += duration - left;
1424         else if (!extend && (duration > obd->obd_recovery_timeout))
1425                 /* Track the client's largest expected replay time */
1426                 obd->obd_recovery_timeout = duration;
1427
1428         /* Hard limit of obd_recovery_time_hard which should not happen */
1429         if(obd->obd_recovery_timeout > obd->obd_recovery_time_hard)
1430                 obd->obd_recovery_timeout = obd->obd_recovery_time_hard;
1431
1432         obd->obd_recovery_end = obd->obd_recovery_start +
1433                                 obd->obd_recovery_timeout;
1434         if (cfs_time_before(now, obd->obd_recovery_end)) {
1435                 left = cfs_time_sub(obd->obd_recovery_end, now);
1436                 cfs_timer_arm(&obd->obd_recovery_timer, cfs_time_shift(left));
1437         }
1438         spin_unlock_bh(&obd->obd_processing_task_lock);
1439         CDEBUG(D_HA, "%s: recovery timer will expire in %u seconds\n",
1440                obd->obd_name, (unsigned)left);
1441 }
1442
1443 static void check_and_start_recovery_timer(struct obd_device *obd,
1444                                            svc_handler_t handler)
1445 {
1446         spin_lock_bh(&obd->obd_processing_task_lock);
1447         if (obd->obd_recovery_handler) {
1448                 spin_unlock_bh(&obd->obd_processing_task_lock);
1449                 return;
1450         }
1451         CDEBUG(D_HA, "%s: starting recovery timer\n", obd->obd_name);
1452         obd->obd_recovery_start = cfs_time_current_sec();
1453         obd->obd_recovery_handler = handler;
1454         cfs_timer_init(&obd->obd_recovery_timer, target_recovery_expired, obd);
1455         spin_unlock_bh(&obd->obd_processing_task_lock);
1456
1457         reset_recovery_timer(obd, obd->obd_recovery_timeout, 0);
1458 }
1459
1460 /* Reset the timer with each new client connection */
1461 /*
1462  * This timer is actually reconnect_timer, which is for making sure
1463  * the total recovery window is at least as big as my reconnect
1464  * attempt timing. So the initial recovery time_out will be set to
1465  * OBD_RECOVERY_FACTOR * obd_timeout. If the timeout coming
1466  * from client is bigger than this, then the recovery time_out will
1467  * be extend to make sure the client could be reconnected, in the
1468  * process, the timeout from the new client should be ignored.
1469  */
1470
1471 static void
1472 target_start_and_reset_recovery_timer(struct obd_device *obd,
1473                                       svc_handler_t handler,
1474                                       struct ptlrpc_request *req,
1475                                       int new_client)
1476 {
1477         int service_time = lustre_msg_get_service_time(req->rq_reqmsg);
1478
1479         if (!new_client && service_time)
1480                 /* Teach server about old server's estimates, as first guess
1481                    at how long new requests will take. */
1482                 at_measured(&req->rq_rqbd->rqbd_service->srv_at_estimate,
1483                             service_time);
1484
1485         check_and_start_recovery_timer(obd, handler);
1486
1487         /* convert the service time to rpc timeout,
1488          * reuse service_time to limit stack usage */
1489         service_time = at_est2timeout(service_time);
1490
1491         /* We expect other clients to timeout within service_time, then try
1492          * to reconnect, then try the failover server.  The max delay between
1493          * connect attempts is SWITCH_MAX + SWITCH_INC + INITIAL */
1494         service_time += 2 * (CONNECTION_SWITCH_MAX + CONNECTION_SWITCH_INC +
1495                              INITIAL_CONNECT_TIMEOUT);
1496         if (service_time > obd->obd_recovery_timeout && !new_client)
1497                 reset_recovery_timer(obd, service_time, 0);
1498 }
1499
1500 static int check_for_next_transno(struct obd_device *obd)
1501 {
1502         struct ptlrpc_request *req;
1503         int wake_up = 0, connected, completed, queue_len, max;
1504         __u64 next_transno, req_transno;
1505
1506         if (obd->obd_stopping) {
1507                 CDEBUG(D_HA, "waking for stopping device\n");
1508                 return 1;
1509         }
1510
1511         spin_lock_bh(&obd->obd_processing_task_lock);
1512         if (obd->obd_abort_recovery) {
1513                 CDEBUG(D_HA, "waking for aborted recovery\n");
1514                 spin_unlock_bh(&obd->obd_processing_task_lock);
1515                 return 1;
1516         } else if (!obd->obd_recovering) {
1517                 CDEBUG(D_HA, "waking for completed recovery (?)\n");
1518                 spin_unlock_bh(&obd->obd_processing_task_lock);
1519                 return 1;
1520         }
1521
1522         LASSERT(!list_empty(&obd->obd_recovery_queue));
1523         req = list_entry(obd->obd_recovery_queue.next,
1524                          struct ptlrpc_request, rq_list);
1525         max = obd->obd_max_recoverable_clients;
1526         req_transno = lustre_msg_get_transno(req->rq_reqmsg);
1527         connected = obd->obd_connected_clients;
1528         completed = max - obd->obd_recoverable_clients -
1529                     obd->obd_delayed_clients;
1530         queue_len = obd->obd_requests_queued_for_recovery;
1531         next_transno = obd->obd_next_recovery_transno;
1532
1533         CDEBUG(D_HA,"max: %d, connected: %d, delayed %d, completed: %d, "
1534                "queue_len: %d, req_transno: "LPU64", next_transno: "LPU64"\n",
1535                max, connected, obd->obd_delayed_clients, completed, queue_len,
1536                req_transno, next_transno);
1537         if (req_transno == next_transno) {
1538                 CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno);
1539                 wake_up = 1;
1540         } else if (queue_len == obd->obd_recoverable_clients) {
1541                 CDEBUG(D_ERROR,
1542                        "%s: waking for skipped transno (skip: "LPD64
1543                        ", ql: %d, comp: %d, conn: %d, next: "LPD64")\n",
1544                        obd->obd_name, next_transno, queue_len, completed, max,
1545                        req_transno);
1546                 obd->obd_next_recovery_transno = req_transno;
1547                 wake_up = 1;
1548         }
1549         spin_unlock_bh(&obd->obd_processing_task_lock);
1550         LASSERT(lustre_msg_get_transno(req->rq_reqmsg) >= next_transno);
1551         return wake_up;
1552 }
1553
1554 static void process_recovery_queue(struct obd_device *obd)
1555 {
1556         struct ptlrpc_request *req;
1557         struct l_wait_info lwi = { 0 };
1558         ENTRY;
1559
1560         for (;;) {
1561                 spin_lock_bh(&obd->obd_processing_task_lock);
1562
1563                 if (!obd->obd_recovering) {
1564                         spin_unlock_bh(&obd->obd_processing_task_lock);
1565                         EXIT;
1566                         return;
1567                 }
1568
1569                 LASSERTF(obd->obd_processing_task == cfs_curproc_pid(),
1570                          "%s: invalid pid in obd_processing_task (%d != %d)\n",
1571                          obd->obd_name, obd->obd_processing_task,
1572                          cfs_curproc_pid());
1573                 req = list_entry(obd->obd_recovery_queue.next,
1574                                  struct ptlrpc_request, rq_list);
1575
1576                 if (lustre_msg_get_transno(req->rq_reqmsg) !=
1577                     obd->obd_next_recovery_transno) {
1578                         spin_unlock_bh(&obd->obd_processing_task_lock);
1579                         CDEBUG(D_HA, "%s: waiting for transno "LPD64" (1st is "
1580                                LPD64", x"LPU64")\n", obd->obd_name,
1581                                obd->obd_next_recovery_transno,
1582                                lustre_msg_get_transno(req->rq_reqmsg),
1583                                req->rq_xid);
1584                         l_wait_event(obd->obd_next_transno_waitq,
1585                                      check_for_next_transno(obd), &lwi);
1586                         if (target_recovery_check_and_stop(obd)) {
1587                                 EXIT;
1588                                 return;
1589                         }
1590                         continue;
1591                 }
1592                 list_del_init(&req->rq_list);
1593                 LASSERT(obd->obd_recovery_thread);
1594                 /* replace request initial thread with current one, bug #18221 */
1595                 req->rq_svc_thread = obd->obd_recovery_thread;
1596                 obd->obd_requests_queued_for_recovery--;
1597                 spin_unlock_bh(&obd->obd_processing_task_lock);
1598
1599                 DEBUG_REQ(D_HA, req, "processing: ");
1600                 (void)obd->obd_recovery_handler(req);
1601                 obd->obd_replayed_requests++;
1602                 /* Extend the recovery timer enough to complete the next
1603                  * replayed rpc */
1604                 reset_recovery_timer(obd, AT_OFF ? obd_timeout :
1605                        at_get(&req->rq_rqbd->rqbd_service->srv_at_estimate), 1);
1606                 /* bug 1580: decide how to properly sync() in recovery */
1607                 //mds_fsync_super(obd->u.obt.obt_sb);
1608                 spin_lock_bh(&obd->obd_processing_task_lock);
1609                 obd->obd_next_recovery_transno++;
1610                 spin_unlock_bh(&obd->obd_processing_task_lock);
1611                 target_exp_dequeue_req_replay(req);
1612                 target_request_copy_put(req);
1613                 OBD_RACE(OBD_FAIL_TGT_REPLAY_DELAY);
1614                 spin_lock_bh(&obd->obd_processing_task_lock);
1615                 if (list_empty(&obd->obd_recovery_queue)) {
1616                         obd->obd_processing_task = 0;
1617                         obd->obd_recovery_thread = NULL;
1618                         spin_unlock_bh(&obd->obd_processing_task_lock);
1619                         break;
1620                 }
1621                 spin_unlock_bh(&obd->obd_processing_task_lock);
1622         }
1623         EXIT;
1624 }
1625
1626 int target_queue_recovery_request(struct ptlrpc_request *req,
1627                                   struct obd_device *obd)
1628 {
1629         struct list_head *tmp;
1630         int inserted = 0;
1631         __u64 transno = lustre_msg_get_transno(req->rq_reqmsg);
1632         ENTRY;
1633         /* CAVEAT EMPTOR: The incoming request message has been swabbed
1634          * (i.e. buflens etc are in my own byte order), but type-dependent
1635          * buffers (eg mds_body, ost_body etc) have NOT been swabbed. */
1636
1637         if (!transno) {
1638                 CFS_INIT_LIST_HEAD(&req->rq_list);
1639                 DEBUG_REQ(D_HA, req, "not queueing");
1640                 RETURN(1);
1641         }
1642
1643         spin_lock_bh(&obd->obd_processing_task_lock);
1644
1645         if (!obd->obd_recovering) {
1646                 spin_unlock_bh(&obd->obd_processing_task_lock);
1647                 RETURN(0);
1648         }
1649
1650         /* If we're processing the queue, we want don't want to queue this
1651          * message.
1652          *
1653          * Also, if this request has a transno less than the one we're waiting
1654          * for, we should process it now.  It could (and currently always will)
1655          * be an open request for a descriptor that was opened some time ago.
1656          *
1657          * Also, a resent, replayed request that has already been
1658          * handled will pass through here and be processed immediately.
1659          */
1660         if (obd->obd_processing_task == cfs_curproc_pid() ||
1661             transno < obd->obd_next_recovery_transno) {
1662                 /* Processing the queue right now, don't re-add. */
1663                 LASSERT(list_empty(&req->rq_list));
1664                 spin_unlock_bh(&obd->obd_processing_task_lock);
1665                 RETURN(1);
1666         }
1667
1668         if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_DROP))) {
1669                 spin_unlock_bh(&obd->obd_processing_task_lock);
1670                 RETURN(0);
1671         }
1672
1673         if (target_exp_enqueue_req_replay(req)) {
1674                 spin_unlock_bh(&obd->obd_processing_task_lock);
1675                 DEBUG_REQ(D_ERROR, req, "%s: dropping resent queued req",
1676                                         obd->obd_name);
1677                 RETURN(0);
1678         }
1679
1680         /* XXX O(n^2) */
1681         list_for_each(tmp, &obd->obd_recovery_queue) {
1682                 struct ptlrpc_request *reqiter =
1683                         list_entry(tmp, struct ptlrpc_request, rq_list);
1684
1685                 if (lustre_msg_get_transno(reqiter->rq_reqmsg) > transno) {
1686                         list_add_tail(&req->rq_list, &reqiter->rq_list);
1687                         inserted = 1;
1688                         break;
1689                 }
1690
1691                 if (unlikely(lustre_msg_get_transno(reqiter->rq_reqmsg) ==
1692                              transno)) {
1693                         spin_unlock_bh(&obd->obd_processing_task_lock);
1694                         DEBUG_REQ(D_ERROR, req, "%s: dropping replay: transno "
1695                                   "has been claimed by another client",
1696                                   obd->obd_name);
1697                         target_exp_dequeue_req_replay(req);
1698                         RETURN(0);
1699                 }
1700         }
1701
1702         if (!inserted) {
1703                 list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
1704         }
1705
1706         target_request_copy_get(req);
1707         obd->obd_requests_queued_for_recovery++;
1708
1709         if (obd->obd_processing_task != 0) {
1710                 /* Someone else is processing this queue, we'll leave it to
1711                  * them.
1712                  */
1713                 cfs_waitq_signal(&obd->obd_next_transno_waitq);
1714                 spin_unlock_bh(&obd->obd_processing_task_lock);
1715                 RETURN(0);
1716         }
1717
1718         /* Nobody is processing, and we know there's (at least) one to process
1719          * now, so we'll do the honours.
1720          */
1721         obd->obd_processing_task = cfs_curproc_pid();
1722         /* save thread that handle recovery queue */
1723         obd->obd_recovery_thread = req->rq_svc_thread;
1724         spin_unlock_bh(&obd->obd_processing_task_lock);
1725
1726         process_recovery_queue(obd);
1727         RETURN(0);
1728 }
1729
1730 struct obd_device * target_req2obd(struct ptlrpc_request *req)
1731 {
1732         return req->rq_export->exp_obd;
1733 }
1734
1735 int target_queue_last_replay_reply(struct ptlrpc_request *req, int rc)
1736 {
1737         struct obd_device *obd = target_req2obd(req);
1738         struct obd_export *exp = req->rq_export;
1739         int recovery_done = 0, delayed_done = 0;
1740
1741         LASSERT ((rc == 0) == req->rq_packed_final);
1742
1743         if (!req->rq_packed_final) {
1744                 /* Just like ptlrpc_error, but without the sending. */
1745                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1746                 if (rc)
1747                         return rc;
1748                 req->rq_type = PTL_RPC_MSG_ERR;
1749         }
1750
1751         LASSERT(!req->rq_reply_state->rs_difficult);
1752         LASSERT(list_empty(&req->rq_list));
1753
1754         /* Don't race cleanup */
1755         spin_lock_bh(&obd->obd_processing_task_lock);
1756         if (obd->obd_stopping) {
1757                 spin_unlock_bh(&obd->obd_processing_task_lock);
1758                 goto out_noconn;
1759         }
1760
1761         if (!exp->exp_vbr_failed) {
1762                 target_request_copy_get(req);
1763                 list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
1764         }
1765
1766         /* only count the first "replay over" request from each
1767            export */
1768         if (exp->exp_replay_needed) {
1769                 spin_lock(&exp->exp_lock);
1770                 exp->exp_replay_needed = 0;
1771                 spin_unlock(&exp->exp_lock);
1772
1773                 if (!exp->exp_delayed) {
1774                         --obd->obd_recoverable_clients;
1775                 } else {
1776                         spin_lock(&exp->exp_lock);
1777                         exp->exp_delayed = 0;
1778                         spin_unlock(&exp->exp_lock);
1779                         delayed_done = 1;
1780                         if (obd->obd_delayed_clients == 0) {
1781                                 spin_unlock_bh(&obd->obd_processing_task_lock);
1782                                 LBUG();
1783                         }
1784                         --obd->obd_delayed_clients;
1785                 }
1786         }
1787         recovery_done = (obd->obd_recoverable_clients == 0);
1788         spin_unlock_bh(&obd->obd_processing_task_lock);
1789
1790         if (delayed_done) {
1791                 /* start pinging export */
1792                 spin_lock(&obd->obd_dev_lock);
1793                 list_add_tail(&exp->exp_obd_chain_timed,
1794                               &obd->obd_exports_timed);
1795                 list_move_tail(&exp->exp_obd_chain, &obd->obd_exports);
1796                 spin_unlock(&obd->obd_dev_lock);
1797                 target_send_delayed_replies(obd);
1798         }
1799
1800         OBD_RACE(OBD_FAIL_LDLM_RECOV_CLIENTS);
1801         if (recovery_done) {
1802                 spin_lock_bh(&obd->obd_processing_task_lock);
1803                 obd->obd_recovering = 0;
1804                 obd->obd_version_recov = 0;
1805                 obd->obd_abort_recovery = 0;
1806                 target_cancel_recovery_timer(obd);
1807                 spin_unlock_bh(&obd->obd_processing_task_lock);
1808
1809                 if (!delayed_done)
1810                         target_finish_recovery(obd);
1811                 CDEBUG(D_HA, "%s: recovery complete\n",
1812                        obd_uuid2str(&obd->obd_uuid));
1813         } else {
1814                 CWARN("%s: %d recoverable clients remain\n",
1815                       obd->obd_name, obd->obd_recoverable_clients);
1816                 cfs_waitq_signal(&obd->obd_next_transno_waitq);
1817         }
1818
1819         /* VBR: disconnect export with failed recovery */
1820         if (exp->exp_vbr_failed) {
1821                 CWARN("%s: disconnect export %s\n", obd->obd_name,
1822                       exp->exp_client_uuid.uuid);
1823                 class_fail_export(exp);
1824                 req->rq_status = 0;
1825                 ptlrpc_send_reply(req, 0);
1826         }
1827
1828         return 1;
1829
1830 out_noconn:
1831         req->rq_status = -ENOTCONN;
1832         /* rv is ignored anyhow */
1833         return -ENOTCONN;
1834 }
1835
1836 int target_handle_reply(struct ptlrpc_request *req, int rc, int fail)
1837 {
1838         struct obd_device *obd = NULL;
1839
1840         if (req->rq_export)
1841                 obd = target_req2obd(req);
1842
1843         /* handle replay reply for version recovery */
1844         if (obd && obd->obd_version_recov &&
1845             (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
1846                 LASSERT(req->rq_repmsg);
1847                 lustre_msg_add_flags(req->rq_repmsg, MSG_VERSION_REPLAY);
1848         }
1849
1850         /* handle last replay */
1851         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1852                 if (obd &&
1853                     lustre_msg_get_flags(req->rq_reqmsg) & MSG_DELAY_REPLAY) {
1854                         DEBUG_REQ(D_HA, req,
1855                                   "delayed LAST_REPLAY, queuing reply");
1856                         rc = target_queue_last_replay_reply(req, rc);
1857                         LASSERT(req->rq_export->exp_delayed == 0);
1858                         return rc;
1859                 }
1860
1861                 if (obd && obd->obd_recovering) { /* normal recovery */
1862                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1863                         rc = target_queue_last_replay_reply(req, rc);
1864                         return rc;
1865                 }
1866
1867                 /* Lost a race with recovery; let the error path DTRT. */
1868                 rc = req->rq_status = -ENOTCONN;
1869         }
1870         target_send_reply(req, rc, fail);
1871         return 0;
1872 }
1873
1874 static inline struct ldlm_pool *ldlm_exp2pl(struct obd_export *exp)
1875 {
1876         LASSERT(exp != NULL);
1877         return &exp->exp_obd->obd_namespace->ns_pool;
1878 }
1879
1880 int target_pack_pool_reply(struct ptlrpc_request *req)
1881 {
1882         struct obd_device *obd;
1883         ENTRY;
1884
1885         /*
1886          * Check that we still have all structures alive as this may
1887          * be some late rpc in shutdown time.
1888          */
1889         if (unlikely(!req->rq_export || !req->rq_export->exp_obd ||
1890                      !exp_connect_lru_resize(req->rq_export))) {
1891                 lustre_msg_set_slv(req->rq_repmsg, 0);
1892                 lustre_msg_set_limit(req->rq_repmsg, 0);
1893                 RETURN(0);
1894         }
1895
1896         /*
1897          * OBD is alive here as export is alive, which we checked above.
1898          */
1899         obd = req->rq_export->exp_obd;
1900
1901         read_lock(&obd->obd_pool_lock);
1902         lustre_msg_set_slv(req->rq_repmsg, obd->obd_pool_slv);
1903         lustre_msg_set_limit(req->rq_repmsg, obd->obd_pool_limit);
1904         read_unlock(&obd->obd_pool_lock);
1905
1906         RETURN(0);
1907 }
1908
1909 int
1910 target_send_reply_msg (struct ptlrpc_request *req, int rc, int fail_id)
1911 {
1912         if (OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) {
1913                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
1914                 DEBUG_REQ(D_ERROR, req, "dropping reply");
1915                 return (-ECOMM);
1916         }
1917
1918         if (rc) {
1919                 DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
1920                 req->rq_status = rc;
1921                 return (ptlrpc_send_error(req, 1));
1922         } else {
1923                 DEBUG_REQ(D_NET, req, "sending reply");
1924         }
1925
1926         return (ptlrpc_send_reply(req, PTLRPC_REPLY_MAYBE_DIFFICULT));
1927 }
1928
1929 void
1930 target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
1931 {
1932         int                        netrc;
1933         struct ptlrpc_reply_state *rs;
1934         struct obd_device         *obd;
1935         struct obd_export         *exp;
1936         struct ptlrpc_service     *svc;
1937
1938         svc = req->rq_rqbd->rqbd_service;
1939         rs = req->rq_reply_state;
1940         if (rs == NULL || !rs->rs_difficult) {
1941                 /* no notifiers */
1942                 target_send_reply_msg (req, rc, fail_id);
1943                 return;
1944         }
1945
1946         /* must be an export if locks saved */
1947         LASSERT (req->rq_export != NULL);
1948         /* req/reply consistent */
1949         LASSERT (rs->rs_service == svc);
1950
1951         /* "fresh" reply */
1952         LASSERT (!rs->rs_scheduled);
1953         LASSERT (!rs->rs_scheduled_ever);
1954         LASSERT (!rs->rs_handled);
1955         LASSERT (!rs->rs_on_net);
1956         LASSERT (rs->rs_export == NULL);
1957         LASSERT (list_empty(&rs->rs_obd_list));
1958         LASSERT (list_empty(&rs->rs_exp_list));
1959
1960         exp = class_export_get(req->rq_export);
1961         obd = exp->exp_obd;
1962
1963         /* disable reply scheduling onto srv_reply_queue while I'm setting up */
1964         rs->rs_scheduled = 1;
1965         rs->rs_on_net    = 1;
1966         rs->rs_xid       = req->rq_xid;
1967         rs->rs_transno   = req->rq_transno;
1968         rs->rs_export    = exp;
1969
1970         spin_lock(&exp->exp_uncommitted_replies_lock);
1971
1972         /* VBR: use exp_last_committed */
1973         if (rs->rs_transno > exp->exp_last_committed) {
1974                 /* not committed already */
1975                 list_add_tail (&rs->rs_obd_list,
1976                                &exp->exp_uncommitted_replies);
1977         }
1978
1979         spin_unlock (&exp->exp_uncommitted_replies_lock);
1980         spin_lock (&exp->exp_lock);
1981
1982         list_add_tail (&rs->rs_exp_list, &exp->exp_outstanding_replies);
1983
1984         spin_unlock(&exp->exp_lock);
1985
1986         netrc = target_send_reply_msg (req, rc, fail_id);
1987
1988         spin_lock(&svc->srv_lock);
1989
1990         svc->srv_n_difficult_replies++;
1991
1992         if (netrc != 0) {
1993                 /* error sending: reply is off the net.  Also we need +1
1994                  * reply ref until ptlrpc_server_handle_reply() is done
1995                  * with the reply state (if the send was successful, there
1996                  * would have been +1 ref for the net, which
1997                  * reply_out_callback leaves alone) */
1998                 rs->rs_on_net = 0;
1999                 ptlrpc_rs_addref(rs);
2000                 atomic_inc (&svc->srv_outstanding_replies);
2001         }
2002
2003         if (!rs->rs_on_net ||                   /* some notifier */
2004             list_empty(&rs->rs_exp_list) ||     /* completed already */
2005             list_empty(&rs->rs_obd_list)) {
2006                 list_add_tail (&rs->rs_list, &svc->srv_reply_queue);
2007                 cfs_waitq_signal (&svc->srv_waitq);
2008         } else {
2009                 list_add (&rs->rs_list, &svc->srv_active_replies);
2010                 rs->rs_scheduled = 0;           /* allow notifier to schedule */
2011         }
2012
2013         spin_unlock(&svc->srv_lock);
2014 }
2015
2016 int target_handle_ping(struct ptlrpc_request *req)
2017 {
2018         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY &&
2019             req->rq_export->exp_in_recovery) {
2020                 spin_lock(&req->rq_export->exp_lock);
2021                 req->rq_export->exp_in_recovery = 0;
2022                 spin_unlock(&req->rq_export->exp_lock);
2023         }
2024         obd_ping(req->rq_export);
2025         return lustre_pack_reply(req, 1, NULL, NULL);
2026 }
2027
2028 void target_committed_to_req(struct ptlrpc_request *req)
2029 {
2030         struct obd_export *exp = req->rq_export;
2031         if (!exp->exp_obd->obd_no_transno && req->rq_repmsg != NULL) {
2032                 lustre_msg_set_last_committed(req->rq_repmsg,
2033                                               exp->exp_last_committed);
2034         } else {
2035                 DEBUG_REQ(D_IOCTL, req, "not sending last_committed update (%d/"
2036                           "%d)", exp->exp_obd->obd_no_transno,
2037                           req->rq_repmsg == NULL);
2038         }
2039         CDEBUG(D_INFO, "last_committed x"LPU64", this req x"LPU64"\n",
2040                exp->exp_obd->obd_last_committed, req->rq_xid);
2041 }
2042
2043 EXPORT_SYMBOL(target_committed_to_req);
2044
2045 int target_handle_qc_callback(struct ptlrpc_request *req)
2046 {
2047         struct obd_quotactl *oqctl;
2048         struct client_obd *cli = &req->rq_export->exp_obd->u.cli;
2049
2050         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
2051                                    lustre_swab_obd_quotactl);
2052         if (oqctl == NULL) {
2053                 CERROR("Can't unpack obd_quatactl\n");
2054                 RETURN(-EPROTO);
2055         }
2056
2057         cli->cl_qchk_stat = oqctl->qc_stat;
2058
2059         return 0;
2060 }
2061
2062 #ifdef HAVE_QUOTA_SUPPORT
2063 int target_handle_dqacq_callback(struct ptlrpc_request *req)
2064 {
2065 #ifdef __KERNEL__
2066         struct obd_device *obd = req->rq_export->exp_obd;
2067         struct obd_device *master_obd = NULL, *lov_obd = NULL;
2068         struct lustre_quota_ctxt *qctxt;
2069         struct qunit_data *qdata = NULL;
2070         int rc = 0;
2071         int repsize[2] = { sizeof(struct ptlrpc_body), 0 };
2072         ENTRY;
2073
2074         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_DROP_QUOTA_REQ))
2075                 RETURN(rc);
2076
2077         repsize[1] = quota_get_qunit_data_size(req->rq_export->
2078                                                exp_connect_flags);
2079
2080         rc = lustre_pack_reply(req, 2, repsize, NULL);
2081         if (rc)
2082                 RETURN(rc);
2083
2084         LASSERT(req->rq_export);
2085
2086         /* there are three forms of qunit(historic causes), so we need to
2087          * adjust qunits from slaves to the same form here */
2088         OBD_ALLOC(qdata, sizeof(struct qunit_data));
2089         if (!qdata)
2090                 RETURN(-ENOMEM);
2091         rc = quota_get_qdata(req, qdata, QUOTA_REQUEST, QUOTA_EXPORT);
2092         if (rc < 0) {
2093                 CDEBUG(D_ERROR, "Can't unpack qunit_data(rc: %d)\n", rc);
2094                 GOTO(out, rc);
2095         }
2096
2097         /* we use the observer */
2098         if (obd_pin_observer(obd, &lov_obd) ||
2099             obd_pin_observer(lov_obd, &master_obd)) {
2100                 CERROR("Can't find the observer, it is recovering\n");
2101                 req->rq_status = -EAGAIN;
2102                 GOTO(send_reply, rc = -EAGAIN);
2103         }
2104
2105         qctxt = &master_obd->u.obt.obt_qctxt;
2106
2107         if (!qctxt->lqc_setup) {
2108                 /* quota_type has not been processed yet, return EAGAIN
2109                  * until we know whether or not quotas are supposed to
2110                  * be enabled */
2111                 CDEBUG(D_QUOTA, "quota_type not processed yet, return "
2112                                 "-EAGAIN\n");
2113                 req->rq_status = -EAGAIN;
2114                 rc = ptlrpc_reply(req);
2115                 GOTO(out, rc);
2116         }
2117
2118         LASSERT(qctxt->lqc_handler);
2119         rc = qctxt->lqc_handler(master_obd, qdata,
2120                                 lustre_msg_get_opc(req->rq_reqmsg));
2121         if (rc && rc != -EDQUOT)
2122                 CDEBUG(rc == -EBUSY  ? D_QUOTA : D_ERROR,
2123                        "dqacq failed! (rc:%d)\n", rc);
2124         req->rq_status = rc;
2125
2126         /* there are three forms of qunit(historic causes), so we need to
2127          * adjust the same form to different forms slaves needed */
2128         rc = quota_copy_qdata(req, qdata, QUOTA_REPLY, QUOTA_EXPORT);
2129         if (rc < 0) {
2130                 CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc);
2131                 GOTO(out, rc);
2132         }
2133
2134         /* Block the quota req. b=14840 */
2135         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_BLOCK_QUOTA_REQ, obd_timeout);
2136  send_reply:
2137         rc = ptlrpc_reply(req);
2138 out:
2139         if (master_obd)
2140                 obd_unpin_observer(lov_obd);
2141         if (lov_obd)
2142                 obd_unpin_observer(obd);
2143         OBD_FREE(qdata, sizeof(struct qunit_data));
2144         RETURN(rc);
2145 #else
2146         return 0;
2147 #endif /* !__KERNEL__ */
2148 }
2149 #endif /* HAVE_QUOTA_SUPPORT */
2150
2151 ldlm_mode_t lck_compat_array[] = {
2152         [LCK_EX] LCK_COMPAT_EX,
2153         [LCK_PW] LCK_COMPAT_PW,
2154         [LCK_PR] LCK_COMPAT_PR,
2155         [LCK_CW] LCK_COMPAT_CW,
2156         [LCK_CR] LCK_COMPAT_CR,
2157         [LCK_NL] LCK_COMPAT_NL,
2158         [LCK_GROUP] LCK_COMPAT_GROUP
2159 };