Whamcloud - gitweb
b=9516
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LDLM
26
27 #ifdef __KERNEL__
28 # include <linux/module.h>
29 #else
30 # include <liblustre.h>
31 #endif
32 #include <linux/obd.h>
33 #include <linux/obd_ost.h> /* for LUSTRE_OSC_NAME */
34 #include <linux/lustre_mds.h> /* for LUSTRE_MDC_NAME */
35 #include <linux/lustre_mgmt.h>
36 #include <linux/lustre_dlm.h>
37 #include <linux/lustre_net.h>
38 #include <linux/lustre_sec.h>
39 #include <linux/lustre_gs.h>
40
41 /* @priority: if non-zero, move the selected to the list head
42  * @nocreate: if non-zero, only search in existed connections
43  */
44 static int import_set_conn(struct obd_import *imp, struct obd_uuid *uuid,
45                            int priority, int nocreate)
46 {
47         struct ptlrpc_connection *ptlrpc_conn;
48         struct obd_import_conn *imp_conn = NULL, *item;
49         int rc = 0;
50         ENTRY;
51
52         LASSERT(!(nocreate && !priority));
53
54         ptlrpc_conn = ptlrpc_uuid_to_connection(uuid);
55         if (!ptlrpc_conn) {
56                 CERROR("can't find connection %s\n", uuid->uuid);
57                 RETURN (-EINVAL);
58         }
59
60         if (!nocreate) {
61                 OBD_ALLOC(imp_conn, sizeof(*imp_conn));
62                 if (!imp_conn) {
63                         CERROR("fail to alloc memory\n");
64                         GOTO(out_put, rc = -ENOMEM);
65                 }
66         }
67
68         spin_lock(&imp->imp_lock);
69         list_for_each_entry(item, &imp->imp_conn_list, oic_item) {
70                 if (obd_uuid_equals(uuid, &item->oic_uuid)) {
71                         if (priority) {
72                                 list_del(&item->oic_item);
73                                 list_add(&item->oic_item, &imp->imp_conn_list);
74                                 item->oic_last_attempt = 0;
75                         }
76                         CDEBUG(D_HA, "imp %p@%s: find existed conn %s%s\n",
77                                imp, imp->imp_obd->obd_name, uuid->uuid,
78                                (priority ? ", move to head." : ""));
79                         spin_unlock(&imp->imp_lock);
80                         GOTO(out_free, rc = 0);
81                 }
82         }
83         /* not found */
84         if (!nocreate) {
85                 imp_conn->oic_conn = ptlrpc_conn;
86                 imp_conn->oic_uuid = *uuid;
87                 imp_conn->oic_last_attempt = 0;
88                 if (priority)
89                         list_add(&imp_conn->oic_item, &imp->imp_conn_list);
90                 else
91                         list_add_tail(&imp_conn->oic_item, &imp->imp_conn_list);
92                 CDEBUG(D_HA, "imp %p@%s: add connection %s at %s\n",
93                        imp, imp->imp_obd->obd_name, uuid->uuid,
94                        (priority ? "head" : "tail"));
95         } else
96                 rc = -ENOENT;
97
98         spin_unlock(&imp->imp_lock);
99         RETURN(0);
100 out_free:
101         if (imp_conn)
102                 OBD_FREE(imp_conn, sizeof(*imp_conn));
103 out_put:
104         ptlrpc_put_connection(ptlrpc_conn);
105         RETURN(rc);
106 }
107
108 int import_set_conn_priority(struct obd_import *imp, struct obd_uuid *uuid)
109 {
110         return import_set_conn(imp, uuid, 1, 1);
111 }
112
113 int client_import_add_conn(struct obd_import *imp, struct obd_uuid *uuid,
114                            int priority)
115 {
116         return import_set_conn(imp, uuid, priority, 0);
117 }
118
119 int client_import_del_conn(struct obd_import *imp, struct obd_uuid *uuid)
120 {
121         struct obd_import_conn *imp_conn;
122         struct obd_export *dlmexp;
123         int rc = -ENOENT;
124         ENTRY;
125
126         spin_lock(&imp->imp_lock);
127         if (list_empty(&imp->imp_conn_list)) {
128                 LASSERT(!imp->imp_conn_current);
129                 LASSERT(!imp->imp_connection);
130                 GOTO(out, rc);
131         }
132
133         list_for_each_entry(imp_conn, &imp->imp_conn_list, oic_item) {
134                 if (!obd_uuid_equals(uuid, &imp_conn->oic_uuid))
135                         continue;
136                 LASSERT(imp_conn->oic_conn);
137
138                 /* is current conn? */
139                 if (imp_conn == imp->imp_conn_current) {
140                         LASSERT(imp_conn->oic_conn == imp->imp_connection);
141
142                         if (imp->imp_state != LUSTRE_IMP_CLOSED &&
143                             imp->imp_state != LUSTRE_IMP_DISCON) {
144                                 CERROR("can't remove current connection\n");
145                                 GOTO(out, rc = -EBUSY);
146                         }
147
148                         ptlrpc_put_connection(imp->imp_connection);
149                         imp->imp_connection = NULL;
150
151                         dlmexp = class_conn2export(&imp->imp_dlm_handle);
152                         if (dlmexp && dlmexp->exp_connection) {
153                                 LASSERT(dlmexp->exp_connection ==
154                                         imp_conn->oic_conn);
155                                 ptlrpc_put_connection(dlmexp->exp_connection);
156                                 dlmexp->exp_connection = NULL;
157                         }
158                 }
159
160                 list_del(&imp_conn->oic_item);
161                 ptlrpc_put_connection(imp_conn->oic_conn);
162                 OBD_FREE(imp_conn, sizeof(*imp_conn));
163                 CDEBUG(D_HA, "imp %p@%s: remove connection %s\n",
164                        imp, imp->imp_obd->obd_name, uuid->uuid);
165                 rc = 0;
166                 break;
167         }
168 out:
169         spin_unlock(&imp->imp_lock);
170         if (rc == -ENOENT)
171                 CERROR("connection %s not found\n", uuid->uuid);
172         RETURN(rc);
173 }
174
175 int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
176 {
177         struct lustre_cfg* lcfg = buf;
178         struct client_obd *cli = &obddev->u.cli;
179         struct obd_import *imp;
180         struct obd_uuid server_uuid;
181         int rq_portal, rp_portal, connect_op;
182         char *name = obddev->obd_type->typ_name;
183         char *mgmt_name = NULL;
184         int rc;
185         ENTRY;
186
187         /* In a more perfect world, we would hang a ptlrpc_client off of
188          * obd_type and just use the values from there. */
189         if (!strcmp(name, OBD_OSC_DEVICENAME)) {
190                 rq_portal = OST_REQUEST_PORTAL;
191                 rp_portal = OSC_REPLY_PORTAL;
192                 connect_op = OST_CONNECT;
193         } else if (!strcmp(name, OBD_MDC_DEVICENAME)) {
194                 rq_portal = MDS_REQUEST_PORTAL;
195                 rp_portal = MDC_REPLY_PORTAL;
196                 connect_op = MDS_CONNECT;
197         } else if (!strcmp(name, OBD_MGMTCLI_DEVICENAME)) {
198                 rq_portal = MGMT_REQUEST_PORTAL;
199                 rp_portal = MGMT_REPLY_PORTAL;
200                 connect_op = MGMT_CONNECT;
201         } else if (!strcmp(name, LUSTRE_GKC_NAME)) {
202                 rq_portal = GKS_REQUEST_PORTAL;
203                 rp_portal = GKC_REPLY_PORTAL;
204                 connect_op = GKS_CONNECT;
205
206         } else {
207                 CERROR("unknown client OBD type \"%s\", can't setup\n",
208                        name);
209                 RETURN(-EINVAL);
210         }
211
212
213         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
214                 CERROR("requires a TARGET UUID\n");
215                 RETURN(-EINVAL);
216         }
217
218         if (LUSTRE_CFG_BUFLEN(lcfg, 1) > 37) {
219                 CERROR("client UUID must be less than 38 characters\n");
220                 RETURN(-EINVAL);
221         }
222
223         if (LUSTRE_CFG_BUFLEN(lcfg, 2) < 1) {
224                 CERROR("setup requires a SERVER UUID\n");
225                 RETURN(-EINVAL);
226         }
227
228         if (LUSTRE_CFG_BUFLEN(lcfg, 2) > 37) {
229                 CERROR("target UUID must be less than 38 characters\n");
230                 RETURN(-EINVAL);
231         }
232
233         sema_init(&cli->cl_sem, 1);
234         cli->cl_conn_count = 0;
235         memcpy(server_uuid.uuid,  lustre_cfg_buf(lcfg, 2),
236                min_t(unsigned int, LUSTRE_CFG_BUFLEN(lcfg, 2), 
237                sizeof(server_uuid)));
238
239         cli->cl_dirty = 0;
240         cli->cl_avail_grant = 0;
241         
242         /* FIXME: should limit this for the sum of all cl_dirty_max */
243         cli->cl_dirty_max = OSC_MAX_DIRTY_DEFAULT * 1024 * 1024;
244         if (cli->cl_dirty_max >> PAGE_SHIFT > num_physpages / 8)
245                 cli->cl_dirty_max = num_physpages << (PAGE_SHIFT - 3);
246
247         INIT_LIST_HEAD(&cli->cl_cache_waiters);
248         INIT_LIST_HEAD(&cli->cl_loi_ready_list);
249         INIT_LIST_HEAD(&cli->cl_loi_write_list);
250         INIT_LIST_HEAD(&cli->cl_loi_read_list);
251         spin_lock_init(&cli->cl_loi_list_lock);
252         cli->cl_r_in_flight = 0;
253         cli->cl_w_in_flight = 0;
254         spin_lock_init(&cli->cl_read_rpc_hist.oh_lock);
255         spin_lock_init(&cli->cl_write_rpc_hist.oh_lock);
256         spin_lock_init(&cli->cl_read_page_hist.oh_lock);
257         spin_lock_init(&cli->cl_write_page_hist.oh_lock);
258
259         cli->cl_dstr_in_flight = 0;
260         cli->cl_max_dstr_in_flight = OST_MAX_THREADS;
261         init_waitqueue_head(&cli->cl_wait_for_destroy_slot);
262
263         memset(&cli->cl_last_write_time, 0,
264                sizeof(cli->cl_last_write_time));
265         
266         cli->cl_cache_wait_num = 0;
267         cli->cl_cache_wait_sum = 0;
268         cli->cl_write_gap_sum = 0;
269         cli->cl_write_gaps = 0;
270         cli->cl_write_num = 0;
271         cli->cl_read_num = 0;
272
273         cli->cl_dirty_num = 0;
274         cli->cl_dirty_sum = 0;
275         cli->cl_dirty_av = 0;
276         cli->cl_sync_rpcs = 0;
277         cli->cl_dirty_dmax = 0;
278         cli->cl_dirty_dmin = 0;
279
280         if (num_physpages >> (20 - PAGE_SHIFT) <= 128) { /* <= 128 MB */
281                 cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES / 4;
282                 cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT / 4;
283 #if 0
284         } else if (num_physpages >> (20 - PAGE_SHIFT) <= 512) { /* <= 512 MB */
285                 cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES / 2;
286                 cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT / 2;
287 #endif
288         } else {
289                 cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES;
290                 cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
291         }
292
293         rc = ldlm_get_ref();
294         if (rc) {
295                 CERROR("ldlm_get_ref failed: %d\n", rc);
296                 GOTO(err, rc);
297         }
298
299         ptlrpc_init_client(rq_portal, rp_portal, name,
300                            &obddev->obd_ldlm_client);
301
302         imp = class_new_import();
303         if (imp == NULL) 
304                 GOTO(err_ldlm, rc = -ENOENT);
305         imp->imp_client = &obddev->obd_ldlm_client;
306         imp->imp_obd = obddev;
307         imp->imp_connect_op = connect_op;
308         imp->imp_generation = 0;
309         imp->imp_initial_recov = 1;
310         INIT_LIST_HEAD(&imp->imp_pinger_chain);
311         memcpy(imp->imp_target_uuid.uuid, lustre_cfg_buf(lcfg, 1),
312                LUSTRE_CFG_BUFLEN(lcfg, 1));
313         class_import_put(imp);
314
315         rc = client_import_add_conn(imp, &server_uuid, 1);
316         if (rc) {
317                 CERROR("can't add initial connection\n");
318                 GOTO(err_import, rc);
319         }
320
321         cli->cl_import = imp;
322         cli->cl_max_mds_easize = sizeof(struct lov_mds_md);
323         cli->cl_max_mds_cookiesize = sizeof(struct llog_cookie);
324         cli->cl_sandev = to_kdev_t(0);
325
326         if (LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
327                 if (!strcmp(lustre_cfg_string(lcfg, 3), "inactive")) {
328                         CDEBUG(D_HA, "marking %s %s->%s as inactive\n",
329                                name, obddev->obd_name,
330                                imp->imp_target_uuid.uuid);
331                         imp->imp_invalid = 1;
332
333                         if (LUSTRE_CFG_BUFLEN(lcfg, 4) > 0)
334                                 mgmt_name = lustre_cfg_string(lcfg, 4);
335                 } else {
336                         mgmt_name = lustre_cfg_string(lcfg, 3);
337                 }
338         }
339 #if 0
340         if (mgmt_name != NULL) {
341                 /* Register with management client if we need to. */
342                 CDEBUG(D_HA, "%s registering with %s for events about %s\n",
343                        obddev->obd_name, mgmt_name, server_uuid.uuid);
344
345                 mgmt_obd = class_name2obd(mgmt_name);
346                 if (!mgmt_obd) {
347                         CERROR("can't find mgmtcli %s to register\n",
348                                mgmt_name);
349                         GOTO(err_import, rc = -ENOSYS);
350                 }
351
352                 register_f = (mgmtcli_register_for_events_t)symbol_get("mgmtcli_register_for_events");
353                 if (!register_f) {
354                         CERROR("can't i_m_g mgmtcli_register_for_events\n");
355                         GOTO(err_import, rc = -ENOSYS);
356                 }
357
358                 rc = register_f(mgmt_obd, obddev, &imp->imp_target_uuid);
359                 symbol_put("mgmtcli_register_for_events");
360
361                 if (!rc)
362                         cli->cl_mgmtcli_obd = mgmt_obd;
363         }
364 #endif
365         RETURN(rc);
366
367 err_import:
368         class_destroy_import(imp);
369 err_ldlm:
370         ldlm_put_ref(0);
371 err:
372         RETURN(rc);
373
374 }
375
376 int client_obd_cleanup(struct obd_device *obddev, int flags)
377 {
378         struct client_obd *cli = &obddev->u.cli;
379         ENTRY;
380
381         if (!cli->cl_import)
382                 RETURN(-EINVAL);
383
384         if (cli->cl_mgmtcli_obd) {
385                 mgmtcli_deregister_for_events_t dereg_f;
386
387                 dereg_f = (mgmtcli_deregister_for_events_t)symbol_get("mgmtcli_deregister_for_events");
388                 dereg_f(cli->cl_mgmtcli_obd, obddev);
389                 symbol_put("mgmtcli_deregister_for_events");
390         }
391
392         /* Here we try to drop the security structure after destroy import,
393          * to avoid issue of "sleep in spinlock".
394          */
395         class_import_get(cli->cl_import);
396         class_destroy_import(cli->cl_import);
397         ptlrpcs_import_drop_sec(cli->cl_import);
398         class_import_put(cli->cl_import);
399         cli->cl_import = NULL;
400
401         ldlm_put_ref(flags & OBD_OPT_FORCE);
402         RETURN(0);
403 }
404
405 int client_connect_import(struct lustre_handle *dlm_handle,
406                           struct obd_device *obd,
407                           struct obd_uuid *cluuid,
408                           struct obd_connect_data *conn_data,
409                           unsigned long connect_flags)
410 {
411         struct client_obd *cli = &obd->u.cli;
412         struct obd_import *imp = cli->cl_import;
413         struct obd_export *exp;
414         int rc;
415         ENTRY;
416
417         down(&cli->cl_sem);
418         rc = class_connect(dlm_handle, obd, cluuid);
419         if (rc)
420                 GOTO(out_sem, rc);
421
422         cli->cl_conn_count++;
423         if (cli->cl_conn_count > 1)
424                 GOTO(out_sem, rc);
425         exp = class_conn2export(dlm_handle);
426
427         if (obd->obd_namespace != NULL)
428                 CERROR("already have namespace!\n");
429         obd->obd_namespace = ldlm_namespace_new(obd->obd_name,
430                                                 LDLM_NAMESPACE_CLIENT);
431         if (obd->obd_namespace == NULL)
432                 GOTO(out_disco, rc = -ENOMEM);
433
434         rc = ptlrpcs_import_get_sec(imp);
435         if (rc != 0)
436                 GOTO(out_ldlm, rc);
437
438         imp->imp_dlm_handle = *dlm_handle;
439         rc = ptlrpc_init_import(imp);
440         if (rc != 0) 
441                 GOTO(out_ldlm, rc);
442
443         imp->imp_connect_flags = connect_flags;
444         if (conn_data)
445                 memcpy(&imp->imp_connect_data, conn_data, sizeof(*conn_data));
446
447         rc = ptlrpc_connect_import(imp, NULL);
448         if (rc != 0) {
449                 LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
450                 GOTO(out_ldlm, rc);
451         }
452         LASSERT(exp->exp_connection);
453         ptlrpc_pinger_add_import(imp);
454         EXIT;
455
456         if (rc) {
457 out_ldlm:
458                 ldlm_namespace_free(obd->obd_namespace, 0);
459                 obd->obd_namespace = NULL;
460 out_disco:
461                 cli->cl_conn_count--;
462                 class_disconnect(exp, 0);
463         } else {
464                 class_export_put(exp);
465         }
466 out_sem:
467         up(&cli->cl_sem);
468         return rc;
469 }
470
471 int client_disconnect_export(struct obd_export *exp, unsigned long flags)
472 {
473         struct obd_device *obd = class_exp2obd(exp);
474         struct client_obd *cli = &obd->u.cli;
475         struct obd_import *imp = cli->cl_import;
476         int rc = 0, err;
477         ENTRY;
478
479         if (!obd) {
480                 CERROR("invalid export for disconnect: exp %p cookie "LPX64"\n",
481                        exp, exp ? exp->exp_handle.h_cookie : -1);
482                 RETURN(-EINVAL);
483         }
484
485         down(&cli->cl_sem);
486         if (!cli->cl_conn_count) {
487                 CERROR("disconnecting disconnected device (%s)\n",
488                        obd->obd_name);
489                 GOTO(out_sem, rc = -EINVAL);
490         }
491
492         cli->cl_conn_count--;
493         if (cli->cl_conn_count)
494                 GOTO(out_no_disconnect, rc = 0);
495
496         /* Some non-replayable imports (MDS's OSCs) are pinged, so just
497          * delete it regardless.  (It's safe to delete an import that was
498          * never added.) */
499         (void)ptlrpc_pinger_del_import(imp);
500
501         if (obd->obd_namespace != NULL) {
502                 /* obd_no_recov == local only */
503                 ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
504                                        obd->obd_no_recov, NULL);
505                 ldlm_namespace_free(obd->obd_namespace, obd->obd_no_recov);
506                 obd->obd_namespace = NULL;
507         }
508
509         /* 
510          * Yeah, obd_no_recov also (mainly) means "forced shutdown".
511          */
512         if (obd->obd_no_recov)
513                 ptlrpc_invalidate_import(imp, 0);
514         else
515                 rc = ptlrpc_disconnect_import(imp);
516
517         EXIT;
518  out_no_disconnect:
519         err = class_disconnect(exp, 0);
520         if (!rc && err)
521                 rc = err;
522  out_sem:
523         up(&cli->cl_sem);
524         RETURN(rc);
525 }
526
527 /* --------------------------------------------------------------------------
528  * from old lib/target.c
529  * -------------------------------------------------------------------------- */
530
531 int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
532                             struct obd_uuid *cluuid, int initial_conn)
533 {
534         if (exp->exp_connection && !initial_conn) {
535                 struct lustre_handle *hdl;
536                 hdl = &exp->exp_imp_reverse->imp_remote_handle;
537                 /* Might be a re-connect after a partition. */
538                 if (!memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
539                         CERROR("%s reconnecting\n", cluuid->uuid);
540                         conn->cookie = exp->exp_handle.h_cookie;
541                         RETURN(EALREADY);
542                 } else {
543                         CERROR("%s reconnecting from %s, "
544                                "handle mismatch (ours "LPX64", theirs "
545                                LPX64")\n", cluuid->uuid,
546                                exp->exp_connection->c_remote_uuid.uuid,
547                                hdl->cookie, conn->cookie);
548                         memset(conn, 0, sizeof *conn);
549                         RETURN(-EALREADY);
550                 }
551         }
552
553         conn->cookie = exp->exp_handle.h_cookie;
554         CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n",
555                cluuid->uuid, exp);
556         CDEBUG(D_IOCTL,"connect: cookie "LPX64"\n", conn->cookie);
557         RETURN(0);
558 }
559
560 static inline int ptlrpc_peer_is_local(struct ptlrpc_peer *peer)
561 {
562         ptl_process_id_t myid;
563
564         PtlGetId(peer->peer_ni->pni_ni_h, &myid);
565         return (memcmp(&peer->peer_id, &myid, sizeof(myid)) == 0);
566 }
567
568 /* To check whether the p_flavor is in deny list or not
569  * rc:
570  *      0           not found, pass
571  *      EPERM       found, refuse
572  */
573
574 static int check_deny_list(struct list_head *head, __u32 flavor)
575 {
576         deny_sec_t *p_deny_sec = NULL;
577         deny_sec_t *n_deny_sec = NULL;
578
579         list_for_each_entry_safe(p_deny_sec, n_deny_sec, head, list) {
580                 if (p_deny_sec->flavor == flavor)
581                         return -EPERM;
582         }
583         return 0;
584 }
585
586 int target_check_deny_sec(struct obd_device *target, struct ptlrpc_request *req)
587 {
588         __u32 flavor;
589         int rc = 0;
590
591         flavor = req->rq_req_secflvr;
592
593         if (!strcmp(target->obd_type->typ_name, OBD_MDS_DEVICENAME)) {
594                 spin_lock(&target->u.mds.mds_denylist_lock);
595                 rc = check_deny_list(&target->u.mds.mds_denylist, flavor);
596                 spin_unlock(&target->u.mds.mds_denylist_lock);
597         } else if (!strcmp(target->obd_type->typ_name, OBD_FILTER_DEVICENAME)) {
598                 spin_lock(&target->u.filter.fo_denylist_lock);
599                 rc = check_deny_list(&target->u.filter.fo_denylist, flavor);
600                 spin_unlock(&target->u.filter.fo_denylist_lock);
601         }
602
603         return rc;
604 }
605
606 int target_handle_connect(struct ptlrpc_request *req)
607 {
608         unsigned long connect_flags = 0, *cfp;
609         struct obd_device *target;
610         struct obd_export *export = NULL;
611         struct obd_import *revimp;
612         struct lustre_handle conn;
613         struct obd_uuid tgtuuid;
614         struct obd_uuid cluuid;
615         struct obd_uuid remote_uuid;
616         struct list_head *p;
617         struct obd_connect_data *conn_data;
618         int conn_data_size = sizeof(*conn_data);
619         char *str, *tmp;
620         int rc = 0;
621         unsigned long flags;
622         int initial_conn = 0;
623         char peer_str[PTL_NALFMT_SIZE];
624         const int offset = 1;
625         ENTRY;
626
627         OBD_RACE(OBD_FAIL_TGT_CONN_RACE); 
628
629         LASSERT_REQSWAB (req, offset + 0);
630         str = lustre_msg_string(req->rq_reqmsg, offset + 0,
631                                 sizeof(tgtuuid) - 1);
632         if (str == NULL) {
633                 CERROR("bad target UUID for connect\n");
634                 GOTO(out, rc = -EINVAL);
635         }
636
637         obd_str2uuid (&tgtuuid, str);
638         target = class_uuid2obd(&tgtuuid);
639         if (!target)
640                 target = class_name2obd(str);
641         
642         if (!target || target->obd_stopping || !target->obd_set_up) {
643                 CERROR("UUID '%s' is not available for connect from %s\n",
644                        str, req->rq_peerstr);
645                 GOTO(out, rc = -ENODEV);
646         }
647
648         /* check the secure deny list of mds/ost */
649         rc = target_check_deny_sec(target, req);
650         if (rc != 0)
651                 GOTO(out, rc);
652
653         LASSERT_REQSWAB (req, offset + 1);
654         str = lustre_msg_string(req->rq_reqmsg, offset + 1, sizeof(cluuid) - 1);
655         if (str == NULL) {
656                 CERROR("bad client UUID for connect\n");
657                 GOTO(out, rc = -EINVAL);
658         }
659
660         obd_str2uuid (&cluuid, str);
661
662         /* XXX extract a nettype and format accordingly */
663         switch (sizeof(ptl_nid_t)) {
664                 /* NB the casts only avoid compiler warnings */
665         case 8:
666                 snprintf((char *)remote_uuid.uuid, sizeof(remote_uuid),
667                          "NET_"LPX64"_UUID", (__u64)req->rq_peer.peer_id.nid);
668                 break;
669         case 4:
670                 snprintf((char *)remote_uuid.uuid, sizeof(remote_uuid),
671                          "NET_%x_UUID", (__u32)req->rq_peer.peer_id.nid);
672                 break;
673         default:
674                 LBUG();
675         }
676
677         tmp = lustre_msg_buf(req->rq_reqmsg, offset + 2, sizeof(conn));
678         if (tmp == NULL)
679                 GOTO(out, rc = -EPROTO);
680
681         memcpy(&conn, tmp, sizeof conn);
682
683         cfp = lustre_msg_buf(req->rq_reqmsg, offset + 3, sizeof(unsigned long));
684         LASSERT(cfp != NULL);
685         connect_flags = *cfp;
686
687         conn_data = lustre_swab_reqbuf(req, offset + 4, sizeof(*conn_data),
688                                        lustre_swab_connect);
689         if (!conn_data)
690                 GOTO(out, rc = -EPROTO);
691
692         rc = lustre_pack_reply(req, 1, &conn_data_size, NULL);
693         if (rc)
694                 GOTO(out, rc);
695         
696         if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_INITIAL)
697                 initial_conn = 1;
698         
699         /* lctl gets a backstage, all-access pass. */
700         if (obd_uuid_equals(&cluuid, &target->obd_uuid))
701                 goto dont_check_exports;
702
703         spin_lock(&target->obd_dev_lock);
704         list_for_each(p, &target->obd_exports) {
705                 export = list_entry(p, struct obd_export, exp_obd_chain);
706                 if (obd_uuid_equals(&cluuid, &export->exp_client_uuid)) {
707                         spin_unlock(&target->obd_dev_lock);
708                         LASSERT(export->exp_obd == target);
709
710                         rc = target_handle_reconnect(&conn, export, &cluuid,
711                                                      initial_conn);
712                         break;
713                 }
714                 export = NULL;
715         }
716         /* If we found an export, we already unlocked. */
717         if (!export) {
718                 spin_unlock(&target->obd_dev_lock);
719         } else if (req->rq_export == NULL && 
720                    atomic_read(&export->exp_rpc_count) > 0) {
721                 CWARN("%s: refuse connection from %s/%s to 0x%p/%d\n",
722                       target->obd_name, cluuid.uuid,
723                       ptlrpc_peernid2str(&req->rq_peer, peer_str),
724                       export, atomic_read(&export->exp_refcount));
725                 GOTO(out, rc = -EBUSY);
726         } else if (req->rq_export != NULL &&
727                    atomic_read(&export->exp_rpc_count) > 1) {
728                 CWARN("%s: refuse reconnection from %s@%s to 0x%p/%d\n",
729                       target->obd_name, cluuid.uuid,
730                       ptlrpc_peernid2str(&req->rq_peer, peer_str),
731                       export, atomic_read(&export->exp_rpc_count));
732                 GOTO(out, rc = -EBUSY);
733         } else if (req->rq_reqmsg->conn_cnt == 1 && !initial_conn) {
734                 CERROR("%s reconnected with 1 conn_cnt; cookies not random?\n",
735                        cluuid.uuid);
736                 GOTO(out, rc = -EALREADY);
737         }
738
739         /* Tell the client if we're in recovery. */
740         /* If this is the first client, start the recovery timer */
741         CWARN("%s: connection from %s@%s/%lu %st"LPU64"\n", target->obd_name,
742               cluuid.uuid, ptlrpc_peernid2str(&req->rq_peer, peer_str), *cfp,
743               target->obd_recovering ? "recovering/" : "", conn_data->transno);
744
745         if (target->obd_recovering) {
746                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
747                 target_start_recovery_timer(target);
748         }
749
750 #if 0
751         /* Tell the client if we support replayable requests */
752         if (target->obd_replayable)
753                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
754 #endif
755
756         if (export == NULL) {
757                 if (target->obd_recovering) {
758                         CERROR("%s denying connection for new client %s@%s: "
759                                "%d clients in recovery for %lds\n", target->obd_name, 
760                                cluuid.uuid,
761                                ptlrpc_peernid2str(&req->rq_peer, peer_str),
762                                target->obd_recoverable_clients,
763                                (target->obd_recovery_timer.expires-jiffies)/HZ);
764                         rc = -EBUSY;
765                 } else {
766  dont_check_exports:
767                         rc = obd_connect(&conn, target, &cluuid, conn_data,
768                                          connect_flags);
769                 }
770         }
771
772         /* Return only the parts of obd_connect_data that we understand, so the
773          * client knows that we don't understand the rest. */
774         conn_data->ocd_connect_flags &= OBD_CONNECT_SUPPORTED;
775         memcpy(lustre_msg_buf(req->rq_repmsg, 0, sizeof(*conn_data)), conn_data,
776                sizeof(*conn_data));
777
778         /* Tell the client if we support replayable requests */
779         if (target->obd_replayable)
780                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
781
782         /* If all else goes well, this is our RPC return code. */
783         req->rq_status = 0;
784
785         if (rc && rc != EALREADY)
786                 GOTO(out, rc);
787
788         req->rq_repmsg->handle = conn;
789
790         /* If the client and the server are the same node, we will already
791          * have an export that really points to the client's DLM export,
792          * because we have a shared handles table.
793          *
794          * XXX this will go away when shaver stops sending the "connect" handle
795          * in the real "remote handle" field of the request --phik 24 Apr 2003
796          */
797         if (req->rq_export != NULL)
798                 class_export_put(req->rq_export);
799
800         /* ownership of this export ref transfers to the request */
801         export = req->rq_export = class_conn2export(&conn);
802         LASSERT(export != NULL);
803
804         spin_lock_irqsave(&export->exp_lock, flags);
805         if (initial_conn) {
806                 req->rq_repmsg->conn_cnt = export->exp_conn_cnt + 1;
807         } else if (export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) {
808                 CERROR("%s@%s: already connected at a higher conn_cnt: %d > %d\n",
809                        cluuid.uuid, ptlrpc_peernid2str(&req->rq_peer, peer_str),
810                        export->exp_conn_cnt, 
811                        req->rq_reqmsg->conn_cnt);
812                 spin_unlock_irqrestore(&export->exp_lock, flags);
813                 GOTO(out, rc = -EALREADY);
814         } 
815         export->exp_conn_cnt = req->rq_reqmsg->conn_cnt;
816         spin_unlock_irqrestore(&export->exp_lock, flags);
817
818         /* request from liblustre? */
819         if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_LIBCLIENT)
820                 export->exp_libclient = 1;
821
822         if (!(lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_ASYNC) &&
823             ptlrpc_peer_is_local(&req->rq_peer)) {
824                 CWARN("%s: exp %p set sync\n", target->obd_name, export);
825                 export->exp_sync = 1;
826         } else {
827                 CDEBUG(D_HA, "%s: exp %p set async\n",target->obd_name,export);
828                 export->exp_sync = 0;
829         }
830
831         if (export->exp_connection != NULL)
832                 ptlrpc_put_connection(export->exp_connection);
833         export->exp_connection = ptlrpc_get_connection(&req->rq_peer,
834                                                        &remote_uuid);
835
836         if (rc == EALREADY) {
837                 /* We indicate the reconnection in a flag, not an error code. */
838                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
839                 GOTO(out, rc = 0);
840         }
841
842         spin_lock_bh(&target->obd_processing_task_lock);
843         if (target->obd_recovering && export->exp_connected == 0) {
844                 __u64 t = conn_data->transno;
845                 export->exp_connected = 1;
846                 if ((lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_TRANSNO)
847                                 && t < target->obd_next_recovery_transno)
848                         target->obd_next_recovery_transno = t;
849                 target->obd_connected_clients++;
850                 if (target->obd_connected_clients == target->obd_max_recoverable_clients)
851                         wake_up(&target->obd_next_transno_waitq);
852         }
853         spin_unlock_bh(&target->obd_processing_task_lock);
854
855         memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, offset + 2, sizeof(conn)),
856                sizeof(conn));
857
858         if (export->exp_imp_reverse != NULL) {
859                 /* same logic as client_obd_cleanup */
860                 class_import_get(export->exp_imp_reverse);
861                 class_destroy_import(export->exp_imp_reverse);
862                 ptlrpcs_import_drop_sec(export->exp_imp_reverse);
863                 class_import_put(export->exp_imp_reverse);
864         }
865
866         /* for the rest part, we return -ENOTCONN in case of errors
867          * in order to let client initialize connection again.
868          */
869         revimp = export->exp_imp_reverse = class_new_import();
870         if (!revimp) {
871                 CERROR("fail to alloc new reverse import.\n");
872                 GOTO(out, rc = -ENOTCONN);
873         }
874
875         revimp->imp_connection = ptlrpc_connection_addref(export->exp_connection);
876         revimp->imp_client = &export->exp_obd->obd_ldlm_client;
877         revimp->imp_remote_handle = conn;
878         revimp->imp_obd = target;
879         revimp->imp_dlm_fake = 1;
880         revimp->imp_state = LUSTRE_IMP_FULL;
881
882         rc = ptlrpcs_import_get_sec(revimp);
883         if (rc) {
884                 CERROR("reverse import can not get sec: %d\n", rc);
885                 class_destroy_import(revimp);
886                 export->exp_imp_reverse = NULL;
887                 GOTO(out, rc = -ENOTCONN);
888         }
889
890         class_import_put(revimp);
891
892         rc = obd_connect_post(export, initial_conn, connect_flags);
893 out:
894         if (rc)
895                 req->rq_status = rc;
896         RETURN(rc);
897 }
898
899 int target_handle_disconnect(struct ptlrpc_request *req)
900 {
901         struct obd_export *exp;
902         int rc;
903         ENTRY;
904
905         rc = lustre_pack_reply(req, 0, NULL, NULL);
906         if (rc)
907                 RETURN(rc);
908
909         /* keep the rq_export around so we can send the reply */
910         exp = class_export_get(req->rq_export);
911         req->rq_status = obd_disconnect(exp, 0);
912         RETURN(0);
913 }
914
915 void target_destroy_export(struct obd_export *exp)
916 {
917         /* exports created from last_rcvd data, and "fake"
918            exports created by lctl don't have an import */
919         if (exp->exp_imp_reverse != NULL) {
920                 ptlrpcs_import_drop_sec(exp->exp_imp_reverse);
921                 class_destroy_import(exp->exp_imp_reverse);
922         }
923
924         /* We cancel locks at disconnect time, but this will catch any locks
925          * granted in a race with recovery-induced disconnect. */
926         if (exp->exp_obd->obd_namespace != NULL)
927                 ldlm_cancel_locks_for_export(exp);
928 }
929
930 /*
931  * Recovery functions
932  */
933
934 struct ptlrpc_request *
935 ptlrpc_clone_req( struct ptlrpc_request *orig_req) 
936 {
937         struct ptlrpc_request *copy_req;
938         struct lustre_msg *copy_reqmsg;
939
940         OBD_ALLOC(copy_req, sizeof *copy_req);
941         if (!copy_req)
942                 return NULL;
943         OBD_ALLOC(copy_reqmsg, orig_req->rq_reqlen);
944         if (!copy_reqmsg){
945                 OBD_FREE(copy_req, sizeof *copy_req);
946                 return NULL;
947         }
948
949         memcpy(copy_req, orig_req, sizeof *copy_req);
950         memcpy(copy_reqmsg, orig_req->rq_reqmsg, orig_req->rq_reqlen);
951         /* the copied req takes over the reply state and security data */
952         orig_req->rq_reply_state = NULL;
953         orig_req->rq_svcsec_data = NULL;
954
955         copy_req->rq_reqmsg = copy_reqmsg;
956         class_export_get(copy_req->rq_export);
957         INIT_LIST_HEAD(&copy_req->rq_list);
958
959         return copy_req;
960 }
961
962 void ptlrpc_free_clone( struct ptlrpc_request *req) 
963 {
964         if (req->rq_svcsec)
965                 svcsec_cleanup_req(req);
966
967         class_export_put(req->rq_export);
968         list_del(&req->rq_list);
969         OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
970         OBD_FREE(req, sizeof *req);
971 }
972
973 static void target_release_saved_req(struct ptlrpc_request *req)
974 {
975         if (req->rq_svcsec)
976                 svcsec_cleanup_req(req);
977
978         class_export_put(req->rq_export);
979         OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
980         OBD_FREE(req, sizeof *req);
981 }
982
983 static void target_finish_recovery(struct obd_device *obd)
984 {
985         int rc;
986
987         ldlm_reprocess_all_ns(obd->obd_namespace);
988
989         /* when recovery finished, cleanup orphans on mds and ost */
990         if (OBT(obd) && OBP(obd, postrecov)) {
991                 rc = OBP(obd, postrecov)(obd);
992                 if (rc >= 0)
993                         CWARN("%s: all clients recovered, %d MDS "
994                               "orphans deleted\n", obd->obd_name, rc);
995                 else
996                         CERROR("postrecov failed %d\n", rc);
997         }
998
999         obd->obd_recovery_end = LTIME_S(CURRENT_TIME);
1000         return;
1001 }
1002
1003 static void abort_req_replay_queue(struct obd_device *obd)
1004 {
1005         struct ptlrpc_request *req;
1006         struct list_head *tmp, *n;
1007         int rc;
1008
1009         list_for_each_safe(tmp, n, &obd->obd_req_replay_queue) {
1010                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
1011                 list_del(&req->rq_list);
1012                 DEBUG_REQ(D_ERROR, req, "aborted:");
1013                 req->rq_status = -ENOTCONN;
1014                 req->rq_type = PTL_RPC_MSG_ERR;
1015                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1016                 if (rc == 0) {
1017                         ptlrpc_reply(req);
1018                 } else {
1019                         DEBUG_REQ(D_ERROR, req,
1020                                   "packing failed for abort-reply; skipping");
1021                 }
1022                 target_release_saved_req(req);
1023         }
1024 }
1025
1026 static void abort_lock_replay_queue(struct obd_device *obd)
1027 {
1028         struct ptlrpc_request *req;
1029         struct list_head *tmp, *n;
1030         int rc;
1031
1032         list_for_each_safe(tmp, n, &obd->obd_lock_replay_queue) {
1033                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
1034                 list_del(&req->rq_list);
1035                 DEBUG_REQ(D_ERROR, req, "aborted:");
1036                 req->rq_status = -ENOTCONN;
1037                 req->rq_type = PTL_RPC_MSG_ERR;
1038                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1039                 if (rc == 0) {
1040                         ptlrpc_reply(req);
1041                 } else {
1042                         DEBUG_REQ(D_ERROR, req,
1043                                   "packing failed for abort-reply; skipping");
1044                 }
1045                 target_release_saved_req(req);
1046         }
1047 }
1048
1049 /* Called from a cleanup function if the device is being cleaned up
1050    forcefully.  The exports should all have been disconnected already,
1051    the only thing left to do is
1052      - clear the recovery flags
1053      - cancel the timer
1054      - free queued requests and replies, but don't send replies
1055    Because the obd_stopping flag is set, no new requests should be received.
1056
1057 */
1058 void target_cleanup_recovery(struct obd_device *obd)
1059 {
1060         struct list_head *tmp, *n;
1061         struct ptlrpc_request *req;
1062
1063         spin_lock_bh(&obd->obd_processing_task_lock);
1064         if (!obd->obd_recovering) {
1065                 spin_unlock_bh(&obd->obd_processing_task_lock);
1066                 EXIT;
1067                 return;
1068         }
1069         obd->obd_recovering = obd->obd_abort_recovery = 0;
1070         target_cancel_recovery_timer(obd);
1071         spin_unlock_bh(&obd->obd_processing_task_lock);
1072
1073         list_for_each_safe(tmp, n, &obd->obd_req_replay_queue) {
1074                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
1075                 list_del(&req->rq_list);
1076                 LASSERT (req->rq_reply_state == 0);
1077                 target_release_saved_req(req);
1078         }
1079         list_for_each_safe(tmp, n, &obd->obd_lock_replay_queue) {
1080                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
1081                 list_del(&req->rq_list);
1082                 LASSERT (req->rq_reply_state == 0);
1083                 target_release_saved_req(req);
1084         }
1085         list_for_each_safe(tmp, n, &obd->obd_final_req_queue) {
1086                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
1087                 list_del(&req->rq_list);
1088                 LASSERT (req->rq_reply_state == 0);
1089                 target_release_saved_req(req);
1090         }
1091 }
1092
1093 #if 0
1094 static void target_abort_recovery(void *data)
1095 {
1096         struct obd_device *obd = data;
1097
1098         LASSERT(!obd->obd_recovering);
1099
1100         class_disconnect_stale_exports(obd, 0);
1101
1102         CERROR("%s: recovery period over; disconnecting unfinished clients.\n",
1103                obd->obd_name);
1104
1105         abort_recovery_queue(obd);
1106         target_finish_recovery(obd);
1107         ptlrpc_run_recovery_over_upcall(obd);
1108 }
1109 #endif
1110
1111 static void target_recovery_expired(unsigned long castmeharder)
1112 {
1113         struct obd_device *obd = (struct obd_device *)castmeharder;
1114         spin_lock_bh(&obd->obd_processing_task_lock);
1115         if (obd->obd_recovering)
1116                 obd->obd_abort_recovery = 1;
1117
1118         wake_up(&obd->obd_next_transno_waitq);
1119         spin_unlock_bh(&obd->obd_processing_task_lock);
1120 }
1121
1122
1123 /* obd_processing_task_lock should be held */
1124 void target_cancel_recovery_timer(struct obd_device *obd)
1125 {
1126         CDEBUG(D_HA, "%s: cancel recovery timer\n", obd->obd_name);
1127         del_timer(&obd->obd_recovery_timer);
1128 }
1129
1130 #ifdef __KERNEL__
1131 static void reset_recovery_timer(struct obd_device *obd)
1132 {
1133         spin_lock_bh(&obd->obd_processing_task_lock);
1134         if (!obd->obd_recovering) {
1135                 spin_unlock_bh(&obd->obd_processing_task_lock);
1136                 return;
1137         }                
1138         CDEBUG(D_HA, "timer will expire in %u seconds\n",
1139                OBD_RECOVERY_TIMEOUT / HZ);
1140         mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
1141         spin_unlock_bh(&obd->obd_processing_task_lock);
1142 }
1143 #endif
1144
1145 /* Only start it the first time called */
1146 void target_start_recovery_timer(struct obd_device *obd)
1147 {
1148         spin_lock_bh(&obd->obd_processing_task_lock);
1149         if (!obd->obd_recovering || timer_pending(&obd->obd_recovery_timer)) {
1150                 spin_unlock_bh(&obd->obd_processing_task_lock);
1151                 return;
1152         }
1153         CWARN("%s: starting recovery timer (%us)\n", obd->obd_name,
1154                OBD_RECOVERY_TIMEOUT / HZ);
1155         obd->obd_recovery_timer.function = target_recovery_expired;
1156         obd->obd_recovery_timer.data = (unsigned long)obd;
1157         mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
1158         spin_unlock_bh(&obd->obd_processing_task_lock);
1159 }
1160
1161 #ifdef __KERNEL__
1162 static int check_for_next_transno(struct obd_device *obd)
1163 {
1164         struct ptlrpc_request *req = NULL;
1165         int wake_up = 0, connected, completed, queue_len, max;
1166         __u64 next_transno, req_transno;
1167
1168         spin_lock_bh(&obd->obd_processing_task_lock);
1169         if (!list_empty(&obd->obd_req_replay_queue)) {
1170                 req = list_entry(obd->obd_req_replay_queue.next,
1171                                  struct ptlrpc_request, rq_list);
1172                 req_transno = req->rq_reqmsg->transno;
1173         } else {
1174                 req_transno = 0;
1175         }
1176
1177         max = obd->obd_max_recoverable_clients;
1178         connected = obd->obd_connected_clients;
1179         completed = max - obd->obd_recoverable_clients;
1180         queue_len = obd->obd_requests_queued_for_recovery;
1181         next_transno = obd->obd_next_recovery_transno;
1182
1183         CDEBUG(D_HA,"max: %d, connected: %d, completed: %d, queue_len: %d, "
1184                "req_transno: "LPU64", next_transno: "LPU64"\n",
1185                max, connected, completed, queue_len, req_transno, next_transno);
1186         if (obd->obd_abort_recovery) {
1187                 CDEBUG(D_HA, "waking for aborted recovery\n");
1188                 wake_up = 1;
1189         } else if (atomic_read(&obd->obd_req_replay_clients) == 0) {
1190                 CDEBUG(D_HA, "waking for completed recovery\n");
1191                 wake_up = 1;
1192         } else if (req_transno == next_transno) {
1193                 CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno);
1194                 wake_up = 1;
1195         } else if (queue_len + completed == max) {
1196                 LASSERT(req->rq_reqmsg->transno >= next_transno);
1197                 CDEBUG(req_transno > obd->obd_last_committed ? D_ERROR : D_HA,
1198                        "waking for skipped transno (skip: "LPD64
1199                        ", ql: %d, comp: %d, conn: %d, next: "LPD64")\n",
1200                        next_transno, queue_len, completed, max, req_transno);
1201                 obd->obd_next_recovery_transno = req_transno;
1202                 wake_up = 1;
1203         } else if (queue_len == atomic_read(&obd->obd_req_replay_clients)) {
1204                 /* some clients haven't connected in time, but we can try
1205                  * to replay requests that demand on already committed ones
1206                  * also, we can replay first non-committed transation */
1207                 LASSERT(req_transno != 0);
1208                 if (req_transno == obd->obd_last_committed + 1) {
1209                         obd->obd_next_recovery_transno = req_transno;
1210                 } else if (req_transno > obd->obd_last_committed) {
1211                         /* can't continue recovery: have no needed transno */
1212                         obd->obd_abort_recovery = 1;
1213                         CDEBUG(D_ERROR, "abort due to missed clients. max: %d, "
1214                                "connected: %d, completed: %d, queue_len: %d, "
1215                                "req_transno: "LPU64", next_transno: "LPU64"\n",
1216                                max, connected, completed, queue_len,
1217                                req_transno, next_transno);
1218                 }
1219                 wake_up = 1;
1220         }
1221         spin_unlock_bh(&obd->obd_processing_task_lock);
1222         
1223         return wake_up;
1224 }
1225
1226 static struct ptlrpc_request *
1227 target_next_replay_req(struct obd_device *obd)
1228 {
1229         struct l_wait_info lwi = { 0 };
1230         struct ptlrpc_request *req;
1231
1232         CDEBUG(D_HA, "Waiting for transno "LPD64"\n",
1233                obd->obd_next_recovery_transno);
1234         l_wait_event(obd->obd_next_transno_waitq,
1235                      check_for_next_transno(obd), &lwi);
1236         
1237         spin_lock_bh(&obd->obd_processing_task_lock);
1238         if (obd->obd_abort_recovery) {
1239                 req = NULL;
1240         } else if (!list_empty(&obd->obd_req_replay_queue)) {
1241                 req = list_entry(obd->obd_req_replay_queue.next,
1242                                  struct ptlrpc_request, rq_list);
1243                 list_del_init(&req->rq_list);
1244                 obd->obd_requests_queued_for_recovery--;
1245         } else {
1246                 req = NULL;
1247         }
1248         spin_unlock_bh(&obd->obd_processing_task_lock);
1249         return req;
1250 }
1251
1252 static int check_for_next_lock(struct obd_device *obd)
1253 {
1254         struct ptlrpc_request *req = NULL;
1255         int wake_up = 0;
1256
1257         spin_lock_bh(&obd->obd_processing_task_lock);
1258         if (!list_empty(&obd->obd_lock_replay_queue)) {
1259                 req = list_entry(obd->obd_lock_replay_queue.next,
1260                                  struct ptlrpc_request, rq_list);
1261                 CDEBUG(D_HA, "waking for next lock\n");
1262                 wake_up = 1;
1263         } else if (atomic_read(&obd->obd_lock_replay_clients) == 0) {
1264                 CDEBUG(D_HA, "waking for completed lock replay\n");
1265                 wake_up = 1;
1266         } else if (obd->obd_abort_recovery) {
1267                 CDEBUG(D_HA, "waking for aborted recovery\n");
1268                 wake_up = 1;
1269         }
1270         spin_unlock_bh(&obd->obd_processing_task_lock);
1271         
1272         return wake_up;
1273 }
1274
1275 static struct ptlrpc_request *
1276 target_next_replay_lock(struct obd_device *obd)
1277 {
1278         struct l_wait_info lwi = { 0 };
1279         struct ptlrpc_request *req;
1280
1281         CDEBUG(D_HA, "Waiting for lock\n");
1282         l_wait_event(obd->obd_next_transno_waitq,
1283                      check_for_next_lock(obd), &lwi);
1284         
1285         spin_lock_bh(&obd->obd_processing_task_lock);
1286         if (obd->obd_abort_recovery) {
1287                 req = NULL;
1288         } else if (!list_empty(&obd->obd_lock_replay_queue)) {
1289                 req = list_entry(obd->obd_lock_replay_queue.next,
1290                                  struct ptlrpc_request, rq_list);
1291                 list_del_init(&req->rq_list);
1292         } else {
1293                 req = NULL;
1294         }
1295         spin_unlock_bh(&obd->obd_processing_task_lock);
1296         return req;
1297 }
1298
1299 static struct ptlrpc_request *
1300 target_next_final_ping(struct obd_device *obd)
1301 {
1302         struct ptlrpc_request *req;
1303
1304         spin_lock_bh(&obd->obd_processing_task_lock);
1305         if (!list_empty(&obd->obd_final_req_queue)) {
1306                 req = list_entry(obd->obd_final_req_queue.next,
1307                                  struct ptlrpc_request, rq_list);
1308                 list_del_init(&req->rq_list);
1309         } else {
1310                 req = NULL;
1311         }
1312         spin_unlock_bh(&obd->obd_processing_task_lock);
1313         return req;
1314 }
1315
1316 static int req_replay_done(struct obd_export *exp)
1317 {
1318         if (exp->exp_req_replay_needed)
1319                 return 0;
1320         return 1;
1321 }
1322
1323 static int lock_replay_done(struct obd_export *exp)
1324 {
1325         if (exp->exp_lock_replay_needed)
1326                 return 0;
1327         return 1;
1328 }
1329
1330 static int connect_done(struct obd_export *exp)
1331 {
1332         if (exp->exp_connected)
1333                 return 1;
1334         return 0;
1335 }
1336
1337 static int check_for_clients(struct obd_device *obd)
1338 {
1339         if (obd->obd_abort_recovery)
1340                 return 1;
1341         LASSERT(obd->obd_connected_clients <= obd->obd_max_recoverable_clients);
1342         if (obd->obd_connected_clients == obd->obd_max_recoverable_clients)
1343                 return 1;
1344         return 0;
1345 }
1346
1347 static int target_recovery_thread(void *arg)
1348 {
1349         struct obd_device *obd = arg;
1350         struct ptlrpc_request *req;
1351         struct target_recovery_data *trd = &obd->obd_recovery_data;
1352         char peer_str[PTL_NALFMT_SIZE];
1353         struct l_wait_info lwi = { 0 };
1354         unsigned long delta;
1355         unsigned long flags;
1356         ENTRY;
1357
1358         kportal_daemonize("tgt-recov");
1359
1360         SIGNAL_MASK_LOCK(current, flags);
1361         sigfillset(&current->blocked);
1362         RECALC_SIGPENDING;
1363         SIGNAL_MASK_UNLOCK(current, flags);
1364
1365         CERROR("%s: started recovery thread pid %d\n", obd->obd_name, 
1366                current->pid);
1367         trd->trd_processing_task = current->pid;
1368
1369         obd->obd_recovering = 1;
1370         complete(&trd->trd_starting);
1371
1372         /* first of all, we have to know the first transno to replay */
1373         obd->obd_abort_recovery = 0;
1374         l_wait_event(obd->obd_next_transno_waitq,
1375                      check_for_clients(obd), &lwi);
1376         
1377         spin_lock_bh(&obd->obd_processing_task_lock);
1378         target_cancel_recovery_timer(obd);
1379         spin_unlock_bh(&obd->obd_processing_task_lock);
1380
1381         /* If some clients haven't connected in time, evict them */
1382         if (obd->obd_abort_recovery) {
1383                 int stale;
1384                 CDEBUG(D_ERROR, "few clients haven't connect in time (%d/%d),"
1385                        "evict them ...\n", obd->obd_connected_clients,
1386                        obd->obd_max_recoverable_clients);
1387                 obd->obd_abort_recovery = 0;
1388                 stale = class_disconnect_stale_exports(obd, connect_done, 0);
1389                 atomic_sub(stale, &obd->obd_req_replay_clients);
1390                 atomic_sub(stale, &obd->obd_lock_replay_clients);
1391         }
1392
1393         /* next stage: replay requests */
1394         delta = jiffies;
1395         obd->obd_req_replaying = 1;
1396         CDEBUG(D_ERROR, "1: request replay stage - %d clients from t"LPU64"\n",
1397               atomic_read(&obd->obd_req_replay_clients),
1398               obd->obd_next_recovery_transno);
1399         while ((req = target_next_replay_req(obd))) {
1400                 LASSERT(trd->trd_processing_task == current->pid);
1401                 DEBUG_REQ(D_HA, req, "processing t"LPD64" from %s", 
1402                           req->rq_reqmsg->transno, 
1403                           ptlrpc_peernid2str(&req->rq_peer, peer_str));
1404                 (void)trd->trd_recovery_handler(req);
1405                 obd->obd_replayed_requests++;
1406                 reset_recovery_timer(obd);
1407                 /* bug 1580: decide how to properly sync() in recovery*/
1408                 //mds_fsync_super(mds->mds_sb);
1409                 ptlrpc_free_clone(req);
1410                 spin_lock_bh(&obd->obd_processing_task_lock);
1411                 obd->obd_next_recovery_transno++;
1412                 spin_unlock_bh(&obd->obd_processing_task_lock);
1413         }
1414
1415         spin_lock_bh(&obd->obd_processing_task_lock);
1416         target_cancel_recovery_timer(obd);
1417         spin_unlock_bh(&obd->obd_processing_task_lock);
1418
1419         /* If some clients haven't replayed requests in time, evict them */
1420         if (obd->obd_abort_recovery) {
1421                 int stale;
1422                 CDEBUG(D_ERROR, "req replay timed out, aborting ...\n");
1423                 obd->obd_abort_recovery = 0;
1424                 stale = class_disconnect_stale_exports(obd, req_replay_done, 0);
1425                 atomic_sub(stale, &obd->obd_lock_replay_clients);
1426                 abort_req_replay_queue(obd);
1427                 /* XXX for debuggin tests 11 and 17 */
1428                 /* LBUG(); */
1429         }
1430
1431         /* The second stage: replay locks */
1432         CDEBUG(D_ERROR, "2: lock replay stage - %d clients\n",
1433               atomic_read(&obd->obd_lock_replay_clients));
1434         while ((req = target_next_replay_lock(obd))) {
1435                 LASSERT(trd->trd_processing_task == current->pid);
1436                 DEBUG_REQ(D_HA, req, "processing lock from %s: ", 
1437                           ptlrpc_peernid2str(&req->rq_peer, peer_str));
1438                 (void)trd->trd_recovery_handler(req);
1439                 reset_recovery_timer(obd);
1440                 ptlrpc_free_clone(req);
1441                 obd->obd_replayed_locks++;
1442         }
1443         
1444         spin_lock_bh(&obd->obd_processing_task_lock);
1445         target_cancel_recovery_timer(obd);
1446         spin_unlock_bh(&obd->obd_processing_task_lock);
1447
1448         /* If some clients haven't replayed requests in time, evict them */
1449         if (obd->obd_abort_recovery) {
1450                 int stale;
1451                 CERROR("lock replay timed out, aborting ...\n");
1452                 obd->obd_abort_recovery = 0;
1453                 stale = class_disconnect_stale_exports(obd, lock_replay_done, 0);
1454                 abort_lock_replay_queue(obd);
1455         }
1456
1457         /* We drop recoverying flag to forward all new requests
1458          * to regular mds_handle() since now */
1459         spin_lock_bh(&obd->obd_processing_task_lock);
1460         obd->obd_recovering = 0;
1461         spin_unlock_bh(&obd->obd_processing_task_lock);
1462
1463         /* The third stage: reply on final pings */
1464         CDEBUG(D_ERROR, "3: final stage - process recovery completion pings\n");
1465         while ((req = target_next_final_ping(obd))) {
1466                 LASSERT(trd->trd_processing_task == current->pid);
1467                 DEBUG_REQ(D_HA, req, "processing final ping from %s: ", 
1468                           ptlrpc_peernid2str(&req->rq_peer, peer_str));
1469                 (void)trd->trd_recovery_handler(req);
1470                 ptlrpc_free_clone(req);
1471         }
1472        
1473         delta = (jiffies - delta) / HZ;
1474         CDEBUG(D_ERROR,"4: recovery completed in %lus - %d/%d reqs/locks\n",
1475               delta, obd->obd_replayed_requests, obd->obd_replayed_locks);
1476         if (delta > obd_timeout * 2) {
1477                 CWARN("too long recovery - read logs\n");
1478                 portals_debug_dumplog();
1479         }
1480         target_finish_recovery(obd);
1481
1482         trd->trd_processing_task = 0;
1483         complete(&trd->trd_finishing);
1484         return 0;
1485 }
1486
1487 int target_start_recovery_thread(struct obd_device *obd, svc_handler_t handler)
1488 {
1489         int rc = 0;
1490         struct target_recovery_data *trd = &obd->obd_recovery_data;
1491
1492         memset(trd, 0, sizeof(*trd));
1493         init_completion(&trd->trd_starting);
1494         init_completion(&trd->trd_finishing);
1495         trd->trd_recovery_handler = handler;
1496
1497         if (kernel_thread(target_recovery_thread, obd, 0) > 0) {
1498                 wait_for_completion(&trd->trd_starting);
1499                 LASSERT(obd->obd_recovering != 0);
1500         } else
1501                 rc = -ECHILD;
1502
1503         return rc;
1504 }
1505
1506 void target_stop_recovery_thread(struct obd_device *obd)
1507 {
1508         spin_lock_bh(&obd->obd_processing_task_lock);
1509         if (obd->obd_recovery_data.trd_processing_task > 0) {
1510                 struct target_recovery_data *trd = &obd->obd_recovery_data;
1511                 CERROR("%s: aborting recovery\n", obd->obd_name);
1512                 obd->obd_abort_recovery = 1;
1513                 wake_up(&obd->obd_next_transno_waitq);
1514                 spin_unlock_bh(&obd->obd_processing_task_lock);
1515                 wait_for_completion(&trd->trd_finishing);
1516         } else {
1517                 spin_unlock_bh(&obd->obd_processing_task_lock);
1518         }
1519 }
1520 #endif
1521
1522 int target_process_req_flags(struct obd_device *obd, struct ptlrpc_request *req)
1523 {
1524         struct obd_export *exp = req->rq_export;
1525         LASSERT(exp != NULL);
1526         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REQ_REPLAY_DONE) {
1527                 /* client declares he's ready to replay locks */
1528                 spin_lock_bh(&obd->obd_processing_task_lock);
1529                 if (exp->exp_req_replay_needed) {
1530                         LASSERT(atomic_read(&obd->obd_req_replay_clients) > 0);
1531                         exp->exp_req_replay_needed = 0;
1532                         atomic_dec(&obd->obd_req_replay_clients);
1533                         obd->obd_recoverable_clients--;
1534                         if (atomic_read(&obd->obd_req_replay_clients) == 0)
1535                                 CDEBUG(D_HA, "all clients have replayed reqs\n");
1536                         wake_up(&obd->obd_next_transno_waitq);
1537                 }
1538                 spin_unlock_bh(&obd->obd_processing_task_lock);
1539         }
1540         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LOCK_REPLAY_DONE) {
1541                 /* client declares he's ready to complete recovery 
1542                  * so, we put the request on th final queue */
1543                 spin_lock_bh(&obd->obd_processing_task_lock);
1544                 if (exp->exp_lock_replay_needed) {
1545                         LASSERT(atomic_read(&obd->obd_lock_replay_clients) > 0);
1546                         exp->exp_lock_replay_needed = 0;
1547                         atomic_dec(&obd->obd_lock_replay_clients);
1548                         if (atomic_read(&obd->obd_lock_replay_clients) == 0)
1549                                 CDEBUG(D_HA, "all clients have replayed locks\n");
1550                         wake_up(&obd->obd_next_transno_waitq);
1551                 }
1552                 spin_unlock_bh(&obd->obd_processing_task_lock);
1553         }
1554
1555         return 0;
1556 }
1557
1558 int target_queue_recovery_request(struct ptlrpc_request *req,
1559                                   struct obd_device *obd)
1560 {
1561         struct list_head *tmp;
1562         int inserted = 0;
1563         __u64 transno = req->rq_reqmsg->transno;
1564
1565         if (obd->obd_recovery_data.trd_processing_task == current->pid) {
1566                 /* Processing the queue right now, don't re-add. */
1567                 return 1;
1568         }
1569
1570         target_process_req_flags(obd, req);
1571
1572         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LOCK_REPLAY_DONE) {
1573                 /* client declares he's ready to complete recovery 
1574                  * so, we put the request on th final queue */
1575                 req = ptlrpc_clone_req(req);
1576                 if (req == NULL)
1577                         return -ENOMEM;
1578                 DEBUG_REQ(D_HA, req, "queue final req");
1579                 spin_lock_bh(&obd->obd_processing_task_lock);
1580                 list_add_tail(&req->rq_list, &obd->obd_final_req_queue);
1581                 spin_unlock_bh(&obd->obd_processing_task_lock);
1582                 return 0;
1583         }
1584         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REQ_REPLAY_DONE) {
1585                 /* client declares he's ready to replay locks */
1586                 req = ptlrpc_clone_req(req);
1587                 if (req == NULL)
1588                         return -ENOMEM;
1589                 DEBUG_REQ(D_HA, req, "queue lock replay req");
1590                 spin_lock_bh(&obd->obd_processing_task_lock);
1591                 list_add_tail(&req->rq_list, &obd->obd_lock_replay_queue);
1592                 spin_unlock_bh(&obd->obd_processing_task_lock);
1593                 wake_up(&obd->obd_next_transno_waitq);
1594                 return 0;
1595         }
1596
1597
1598         /* CAVEAT EMPTOR: The incoming request message has been swabbed
1599          * (i.e. buflens etc are in my own byte order), but type-dependent
1600          * buffers (eg mds_body, ost_body etc) have NOT been swabbed. */
1601
1602         if (!transno) {
1603                 INIT_LIST_HEAD(&req->rq_list);
1604                 DEBUG_REQ(D_HA, req, "not queueing");
1605                 return 1;
1606         }
1607
1608
1609         /* If we're processing the queue, we want don't want to queue this
1610          * message.
1611          *
1612          * Also, if this request has a transno less than the one we're waiting
1613          * for, we should process it now.  It could (and currently always will)
1614          * be an open request for a descriptor that was opened some time ago.
1615          *
1616          * Also, a resent, replayed request that has already been
1617          * handled will pass through here and be processed immediately.
1618          */
1619         spin_lock_bh(&obd->obd_processing_task_lock);
1620         if (transno < obd->obd_next_recovery_transno && obd->obd_req_replaying) {
1621                 /* Processing the queue right now, don't re-add. */
1622                 LASSERT(list_empty(&req->rq_list));
1623                 spin_unlock_bh(&obd->obd_processing_task_lock);
1624                 return 1;
1625         }
1626         spin_unlock_bh(&obd->obd_processing_task_lock);
1627
1628         /* A resent, replayed request that is still on the queue; just drop it.
1629            The queued request will handle this. */
1630         if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY))
1631             == (MSG_RESENT | MSG_REPLAY)) {
1632                 DEBUG_REQ(D_ERROR, req, "dropping resent queued req");
1633                 return 0;
1634         }
1635
1636         req = ptlrpc_clone_req(req);
1637         if (req == NULL)
1638                 return -ENOMEM;
1639
1640         spin_lock_bh(&obd->obd_processing_task_lock);
1641
1642         /* XXX O(n^2) */
1643         list_for_each(tmp, &obd->obd_req_replay_queue) {
1644                 struct ptlrpc_request *reqiter =
1645                         list_entry(tmp, struct ptlrpc_request, rq_list);
1646
1647                 if (reqiter->rq_reqmsg->transno > transno) {
1648                         list_add_tail(&req->rq_list, &reqiter->rq_list);
1649                         inserted = 1;
1650                         break;
1651                 }
1652         }
1653
1654         if (!inserted)
1655                 list_add_tail(&req->rq_list, &obd->obd_req_replay_queue);
1656
1657         obd->obd_requests_queued_for_recovery++;
1658         wake_up(&obd->obd_next_transno_waitq);
1659         spin_unlock_bh(&obd->obd_processing_task_lock);
1660         return 0;
1661 }
1662
1663 struct obd_device * target_req2obd(struct ptlrpc_request *req)
1664 {
1665         return req->rq_export->exp_obd;
1666 }
1667
1668 int
1669 target_send_reply_msg (struct ptlrpc_request *req, int rc, int fail_id)
1670 {
1671         if (OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) {
1672                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
1673                 DEBUG_REQ(D_ERROR, req, "dropping reply");
1674                 /* NB this does _not_ send with ACK disabled, to simulate
1675                  * sending OK, but timing out for the ACK */
1676                 if (req->rq_reply_state != NULL) {
1677                         if (!req->rq_reply_state->rs_difficult) {
1678                                 lustre_free_reply_state (req->rq_reply_state);
1679                                 req->rq_reply_state = NULL;
1680                         } else {
1681                                 struct ptlrpc_service *svc =
1682                                         req->rq_rqbd->rqbd_srv_ni->sni_service;
1683                                 atomic_inc(&svc->srv_outstanding_replies);
1684                         }
1685                 }
1686                 return (-ECOMM);
1687         }
1688
1689         if (rc) {
1690                 req->rq_status = rc;
1691                 return (ptlrpc_error(req));
1692         } else {
1693                 DEBUG_REQ(D_NET, req, "sending reply");
1694         }
1695         
1696         return (ptlrpc_send_reply(req, 1));
1697 }
1698
1699 void 
1700 target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
1701 {
1702         int                        netrc;
1703         unsigned long              flags;
1704         struct ptlrpc_reply_state *rs;
1705         struct obd_device         *obd;
1706         struct obd_export         *exp;
1707         struct ptlrpc_srv_ni      *sni;
1708         struct ptlrpc_service     *svc;
1709
1710         sni = req->rq_rqbd->rqbd_srv_ni;
1711         svc = sni->sni_service;
1712         
1713         rs = req->rq_reply_state;
1714         if (rs == NULL || !rs->rs_difficult) {
1715                 /* The easy case; no notifiers and reply_out_callback()
1716                  * cleans up (i.e. we can't look inside rs after a
1717                  * successful send) */
1718                 netrc = target_send_reply_msg (req, rc, fail_id);
1719
1720                 LASSERT (netrc == 0 || req->rq_reply_state == NULL);
1721                 return;
1722         }
1723
1724         /* must be an export if locks saved */
1725         LASSERT (req->rq_export != NULL);
1726         /* req/reply consistent */
1727         LASSERT (rs->rs_srv_ni == sni);
1728
1729         /* "fresh" reply */
1730         LASSERT (!rs->rs_scheduled);
1731         LASSERT (!rs->rs_scheduled_ever);
1732         LASSERT (!rs->rs_handled);
1733         LASSERT (!rs->rs_on_net);
1734         LASSERT (rs->rs_export == NULL);
1735         LASSERT (list_empty(&rs->rs_obd_list));
1736         LASSERT (list_empty(&rs->rs_exp_list));
1737
1738         exp = class_export_get (req->rq_export);
1739         obd = exp->exp_obd;
1740
1741         /* disable reply scheduling onto srv_reply_queue while I'm setting up */
1742         rs->rs_scheduled = 1;
1743         rs->rs_on_net    = 1;
1744         rs->rs_xid       = req->rq_xid;
1745         rs->rs_transno   = req->rq_transno;
1746         rs->rs_export    = exp;
1747         
1748         spin_lock_irqsave (&obd->obd_uncommitted_replies_lock, flags);
1749
1750         if (rs->rs_transno > obd->obd_last_committed) {
1751                 /* not committed already */ 
1752                 list_add_tail (&rs->rs_obd_list, 
1753                                &obd->obd_uncommitted_replies);
1754         }
1755
1756         spin_unlock (&obd->obd_uncommitted_replies_lock);
1757         spin_lock (&exp->exp_lock);
1758
1759         list_add_tail (&rs->rs_exp_list, &exp->exp_outstanding_replies);
1760
1761         spin_unlock_irqrestore (&exp->exp_lock, flags);
1762
1763         netrc = target_send_reply_msg (req, rc, fail_id);
1764
1765         spin_lock_irqsave (&svc->srv_lock, flags);
1766
1767         svc->srv_n_difficult_replies++;
1768
1769         if (netrc != 0) /* error sending: reply is off the net */
1770                 rs->rs_on_net = 0;
1771
1772         if (!rs->rs_on_net ||                   /* some notifier */
1773             list_empty(&rs->rs_exp_list) ||     /* completed already */
1774             list_empty(&rs->rs_obd_list)) {
1775                 list_add_tail (&rs->rs_list, &svc->srv_reply_queue);
1776                 wake_up (&svc->srv_waitq);
1777         } else {
1778                 list_add (&rs->rs_list, &sni->sni_active_replies);
1779                 rs->rs_scheduled = 0;           /* allow notifier to schedule */
1780         }
1781
1782         spin_unlock_irqrestore (&svc->srv_lock, flags);
1783 }
1784
1785 int target_handle_ping(struct ptlrpc_request *req)
1786 {
1787         return lustre_pack_reply(req, 0, NULL, NULL);
1788 }