Whamcloud - gitweb
dd4ae4741781ea81d24fbbe81435dfc1d76efaaa
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LDLM
26
27 #ifdef __KERNEL__
28 # include <linux/module.h>
29 #else
30 # include <liblustre.h>
31 #endif
32 #include <linux/obd.h>
33 #include <linux/obd_ost.h> /* for LUSTRE_OSC_NAME */
34 #include <linux/lustre_mds.h> /* for LUSTRE_MDC_NAME */
35 #include <linux/lustre_mgmt.h>
36 #include <linux/lustre_dlm.h>
37 #include <linux/lustre_net.h>
38
39 int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
40 {
41         struct ptlrpc_connection *conn;
42         struct lustre_cfg* lcfg = buf;
43         struct client_obd *cli = &obddev->u.cli;
44         struct obd_import *imp;
45         struct obd_uuid server_uuid;
46         int rq_portal, rp_portal, connect_op;
47         char *name = obddev->obd_type->typ_name;
48         char *mgmt_name = NULL;
49         int rc;
50         struct obd_device *mgmt_obd;
51         mgmtcli_register_for_events_t register_f;
52         ENTRY;
53
54         /* In a more perfect world, we would hang a ptlrpc_client off of
55          * obd_type and just use the values from there. */
56         if (!strcmp(name, LUSTRE_OSC_NAME)) {
57                 rq_portal = OST_REQUEST_PORTAL;
58                 rp_portal = OSC_REPLY_PORTAL;
59                 connect_op = OST_CONNECT;
60         } else if (!strcmp(name, LUSTRE_MDC_NAME)) {
61                 rq_portal = MDS_REQUEST_PORTAL;
62                 rp_portal = MDC_REPLY_PORTAL;
63                 connect_op = MDS_CONNECT;
64         } else if (!strcmp(name, LUSTRE_MGMTCLI_NAME)) {
65                 rq_portal = MGMT_REQUEST_PORTAL;
66                 rp_portal = MGMT_REPLY_PORTAL;
67                 connect_op = MGMT_CONNECT;
68         } else {
69                 CERROR("unknown client OBD type \"%s\", can't setup\n",
70                        name);
71                 RETURN(-EINVAL);
72         }
73
74         if (lcfg->lcfg_inllen1 < 1) {
75                 CERROR("requires a TARGET UUID\n");
76                 RETURN(-EINVAL);
77         }
78
79         if (lcfg->lcfg_inllen1 > 37) {
80                 CERROR("client UUID must be less than 38 characters\n");
81                 RETURN(-EINVAL);
82         }
83
84         if (lcfg->lcfg_inllen2 < 1) {
85                 CERROR("setup requires a SERVER UUID\n");
86                 RETURN(-EINVAL);
87         }
88
89         if (lcfg->lcfg_inllen2 > 37) {
90                 CERROR("target UUID must be less than 38 characters\n");
91                 RETURN(-EINVAL);
92         }
93
94         sema_init(&cli->cl_sem, 1);
95         cli->cl_conn_count = 0;
96         memcpy(server_uuid.uuid, lcfg->lcfg_inlbuf2,
97                min_t(unsigned int, lcfg->lcfg_inllen2, sizeof(server_uuid)));
98
99         cli->cl_dirty = 0;
100         cli->cl_avail_grant = 0;
101         /* FIXME: should limit this for the sum of all cl_dirty_max */
102         cli->cl_dirty_max = OSC_MAX_DIRTY_DEFAULT * 1024 * 1024;
103         if (cli->cl_dirty_max >> PAGE_SHIFT > num_physpages / 8)
104                 cli->cl_dirty_max = num_physpages << (PAGE_SHIFT - 3);
105         INIT_LIST_HEAD(&cli->cl_cache_waiters);
106         INIT_LIST_HEAD(&cli->cl_loi_ready_list);
107         INIT_LIST_HEAD(&cli->cl_loi_write_list);
108         INIT_LIST_HEAD(&cli->cl_loi_read_list);
109         spin_lock_init(&cli->cl_loi_list_lock);
110         cli->cl_r_in_flight = 0;
111         cli->cl_w_in_flight = 0;
112         spin_lock_init(&cli->cl_read_rpc_hist.oh_lock);
113         spin_lock_init(&cli->cl_write_rpc_hist.oh_lock);
114         spin_lock_init(&cli->cl_read_page_hist.oh_lock);
115         spin_lock_init(&cli->cl_write_page_hist.oh_lock);
116         if (num_physpages >> (20 - PAGE_SHIFT) <= 128) { /* <= 128 MB */
117                 cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES / 4;
118                 cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT / 4;
119         } else if (num_physpages >> (20 - PAGE_SHIFT) <= 512) { /* <= 512 MB */
120                 cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES / 2;
121                 cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT / 2;
122         } else {
123                 cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES;
124                 cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
125         }
126
127         rc = ldlm_get_ref();
128         if (rc) {
129                 CERROR("ldlm_get_ref failed: %d\n", rc);
130                 GOTO(err, rc);
131         }
132
133         conn = ptlrpc_uuid_to_connection(&server_uuid);
134         if (conn == NULL)
135                 GOTO(err_ldlm, rc = -ENOENT);
136
137         ptlrpc_init_client(rq_portal, rp_portal, name,
138                            &obddev->obd_ldlm_client);
139
140         imp = class_new_import();
141         if (imp == NULL) {
142                 ptlrpc_put_connection(conn);
143                 GOTO(err_ldlm, rc = -ENOENT);
144         }
145         imp->imp_connection = conn;
146         imp->imp_client = &obddev->obd_ldlm_client;
147         imp->imp_obd = obddev;
148         imp->imp_connect_op = connect_op;
149         imp->imp_generation = 0;
150         imp->imp_initial_recov = 1;
151         INIT_LIST_HEAD(&imp->imp_pinger_chain);
152         memcpy(imp->imp_target_uuid.uuid, lcfg->lcfg_inlbuf1,
153               lcfg->lcfg_inllen1);
154         class_import_put(imp);
155
156         cli->cl_import = imp;
157         cli->cl_max_mds_easize = sizeof(struct lov_mds_md);
158         cli->cl_max_mds_cookiesize = sizeof(struct llog_cookie);
159         cli->cl_sandev = to_kdev_t(0);
160
161         if (lcfg->lcfg_inllen3 != 0) {
162                 if (!strcmp(lcfg->lcfg_inlbuf3, "inactive")) {
163                         CDEBUG(D_HA, "marking %s %s->%s as inactive\n",
164                                name, obddev->obd_name,
165                                imp->imp_target_uuid.uuid);
166                         imp->imp_invalid = 1;
167
168                         if (lcfg->lcfg_inllen4 != 0)
169                                 mgmt_name = lcfg->lcfg_inlbuf4;
170                 } else {
171                         mgmt_name = lcfg->lcfg_inlbuf3;
172                 }
173         }
174
175         if (mgmt_name != NULL) {
176                 /* Register with management client if we need to. */
177                 CDEBUG(D_HA, "%s registering with %s for events about %s\n",
178                        obddev->obd_name, mgmt_name, server_uuid.uuid);
179
180                 mgmt_obd = class_name2obd(mgmt_name);
181                 if (!mgmt_obd) {
182                         CERROR("can't find mgmtcli %s to register\n",
183                                mgmt_name);
184                         GOTO(err_import, rc = -ENOSYS);
185                 }
186
187                 register_f = inter_module_get("mgmtcli_register_for_events");
188                 if (!register_f) {
189                         CERROR("can't i_m_g mgmtcli_register_for_events\n");
190                         GOTO(err_import, rc = -ENOSYS);
191                 }
192
193                 rc = register_f(mgmt_obd, obddev, &imp->imp_target_uuid);
194                 inter_module_put("mgmtcli_register_for_events");
195
196                 if (!rc)
197                         cli->cl_mgmtcli_obd = mgmt_obd;
198         }
199
200         RETURN(rc);
201
202 err_import:
203         class_destroy_import(imp);
204 err_ldlm:
205         ldlm_put_ref(0);
206 err:
207         RETURN(rc);
208
209 }
210
211 int client_obd_cleanup(struct obd_device *obddev, int flags)
212 {
213         struct client_obd *cli = &obddev->u.cli;
214
215         if (!cli->cl_import)
216                 RETURN(-EINVAL);
217         if (cli->cl_mgmtcli_obd) {
218                 mgmtcli_deregister_for_events_t dereg_f;
219
220                 dereg_f = inter_module_get("mgmtcli_deregister_for_events");
221                 dereg_f(cli->cl_mgmtcli_obd, obddev);
222                 inter_module_put("mgmtcli_deregister_for_events");
223         }
224         class_destroy_import(cli->cl_import);
225         cli->cl_import = NULL;
226
227         ldlm_put_ref(flags & OBD_OPT_FORCE);
228
229         RETURN(0);
230 }
231
232 int client_connect_import(struct lustre_handle *dlm_handle,
233                           struct obd_device *obd,
234                           struct obd_uuid *cluuid)
235 {
236         struct client_obd *cli = &obd->u.cli;
237         struct obd_import *imp = cli->cl_import;
238         struct obd_export *exp;
239         int rc;
240         ENTRY;
241
242         down(&cli->cl_sem);
243         rc = class_connect(dlm_handle, obd, cluuid);
244         if (rc)
245                 GOTO(out_sem, rc);
246
247         cli->cl_conn_count++;
248         if (cli->cl_conn_count > 1)
249                 GOTO(out_sem, rc);
250         exp = class_conn2export(dlm_handle);
251
252         if (obd->obd_namespace != NULL)
253                 CERROR("already have namespace!\n");
254         obd->obd_namespace = ldlm_namespace_new(obd->obd_name,
255                                                 LDLM_NAMESPACE_CLIENT);
256         if (obd->obd_namespace == NULL)
257                 GOTO(out_disco, rc = -ENOMEM);
258
259         imp->imp_dlm_handle = *dlm_handle;
260         rc = ptlrpc_init_import(imp);
261         if (rc != 0) 
262                 GOTO(out_ldlm, rc);
263
264         exp->exp_connection = ptlrpc_connection_addref(imp->imp_connection);
265         rc = ptlrpc_connect_import(imp, NULL);
266         if (rc != 0) {
267                 LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
268                 GOTO(out_ldlm, rc);
269         }
270
271         ptlrpc_pinger_add_import(imp);
272         EXIT;
273
274         if (rc) {
275 out_ldlm:
276                 ldlm_namespace_free(obd->obd_namespace, 0);
277                 obd->obd_namespace = NULL;
278 out_disco:
279                 cli->cl_conn_count--;
280                 class_disconnect(exp, 0);
281         } else {
282                 class_export_put(exp);
283         }
284 out_sem:
285         up(&cli->cl_sem);
286         return rc;
287 }
288
289 int client_disconnect_export(struct obd_export *exp, int failover)
290 {
291         struct obd_device *obd = class_exp2obd(exp);
292         struct client_obd *cli = &obd->u.cli;
293         struct obd_import *imp = cli->cl_import;
294         int rc = 0, err;
295         ENTRY;
296
297         if (!obd) {
298                 CERROR("invalid export for disconnect: exp %p cookie "LPX64"\n",
299                        exp, exp ? exp->exp_handle.h_cookie : -1);
300                 RETURN(-EINVAL);
301         }
302
303         down(&cli->cl_sem);
304         if (!cli->cl_conn_count) {
305                 CERROR("disconnecting disconnected device (%s)\n",
306                        obd->obd_name);
307                 GOTO(out_sem, rc = -EINVAL);
308         }
309
310         cli->cl_conn_count--;
311         if (cli->cl_conn_count)
312                 GOTO(out_no_disconnect, rc = 0);
313
314         /* Some non-replayable imports (MDS's OSCs) are pinged, so just
315          * delete it regardless.  (It's safe to delete an import that was
316          * never added.) */
317         (void)ptlrpc_pinger_del_import(imp);
318
319         if (obd->obd_namespace != NULL) {
320                 /* obd_no_recov == local only */
321                 ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
322                                        obd->obd_no_recov, NULL);
323                 ldlm_namespace_free(obd->obd_namespace, obd->obd_no_recov);
324                 obd->obd_namespace = NULL;
325         }
326
327         /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
328         if (obd->obd_no_recov)
329                 ptlrpc_invalidate_import(imp);
330         else
331                 rc = ptlrpc_disconnect_import(imp);
332
333         EXIT;
334  out_no_disconnect:
335         err = class_disconnect(exp, 0);
336         if (!rc && err)
337                 rc = err;
338  out_sem:
339         up(&cli->cl_sem);
340         RETURN(rc);
341 }
342
343 /* --------------------------------------------------------------------------
344  * from old lib/target.c
345  * -------------------------------------------------------------------------- */
346
347 int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
348                             struct obd_uuid *cluuid)
349 {
350         if (exp->exp_connection) {
351                 struct lustre_handle *hdl;
352                 hdl = &exp->exp_imp_reverse->imp_remote_handle;
353                 /* Might be a re-connect after a partition. */
354                 if (!memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
355                         CERROR("%s reconnecting\n", cluuid->uuid);
356                         conn->cookie = exp->exp_handle.h_cookie;
357                         RETURN(EALREADY);
358                 } else {
359                         CERROR("%s reconnecting from %s, "
360                                "handle mismatch (ours "LPX64", theirs "
361                                LPX64")\n", cluuid->uuid,
362                                exp->exp_connection->c_remote_uuid.uuid,
363                                hdl->cookie, conn->cookie);
364                         memset(conn, 0, sizeof *conn);
365                         RETURN(-EALREADY);
366                 }
367         }
368
369         conn->cookie = exp->exp_handle.h_cookie;
370         CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n",
371                cluuid->uuid, exp);
372         CDEBUG(D_IOCTL,"connect: cookie "LPX64"\n", conn->cookie);
373         RETURN(0);
374 }
375
376 int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
377 {
378         struct obd_device *target;
379         struct obd_export *export = NULL;
380         struct obd_import *revimp;
381         struct lustre_handle conn;
382         struct obd_uuid tgtuuid;
383         struct obd_uuid cluuid;
384         struct obd_uuid remote_uuid;
385         struct list_head *p;
386         char *str, *tmp;
387         int rc = 0, abort_recovery;
388         unsigned long flags;
389         ENTRY;
390
391         OBD_RACE(OBD_FAIL_TGT_CONN_RACE); 
392
393         LASSERT_REQSWAB (req, 0);
394         str = lustre_msg_string(req->rq_reqmsg, 0, sizeof(tgtuuid) - 1);
395         if (str == NULL) {
396                 CERROR("bad target UUID for connect\n");
397                 GOTO(out, rc = -EINVAL);
398         }
399
400         obd_str2uuid (&tgtuuid, str);
401         target = class_uuid2obd(&tgtuuid);
402         if (!target) {
403                 target = class_name2obd(str);
404         }
405         
406         if (!target || target->obd_stopping || !target->obd_set_up) {
407                 CERROR("UUID '%s' is not available for connect\n", str);
408                 GOTO(out, rc = -ENODEV);
409         }
410
411         LASSERT_REQSWAB (req, 1);
412         str = lustre_msg_string(req->rq_reqmsg, 1, sizeof(cluuid) - 1);
413         if (str == NULL) {
414                 CERROR("bad client UUID for connect\n");
415                 GOTO(out, rc = -EINVAL);
416         }
417
418         obd_str2uuid (&cluuid, str);
419
420         /* XXX extract a nettype and format accordingly */
421         switch (sizeof(ptl_nid_t)) {
422                 /* NB the casts only avoid compiler warnings */
423         case 8:
424                 snprintf(remote_uuid.uuid, sizeof remote_uuid,
425                          "NET_"LPX64"_UUID", (__u64)req->rq_peer.peer_id.nid);
426                 break;
427         case 4:
428                 snprintf(remote_uuid.uuid, sizeof remote_uuid,
429                          "NET_%x_UUID", (__u32)req->rq_peer.peer_id.nid);
430                 break;
431         default:
432                 LBUG();
433         }
434
435         spin_lock_bh(&target->obd_processing_task_lock);
436         abort_recovery = target->obd_abort_recovery;
437         spin_unlock_bh(&target->obd_processing_task_lock);
438         if (abort_recovery)
439                 target_abort_recovery(target);
440
441         tmp = lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn);
442         if (tmp == NULL)
443                 GOTO(out, rc = -EPROTO);
444
445         memcpy(&conn, tmp, sizeof conn);
446
447         rc = lustre_pack_reply(req, 0, NULL, NULL);
448         if (rc)
449                 GOTO(out, rc);
450
451         /* lctl gets a backstage, all-access pass. */
452         if (obd_uuid_equals(&cluuid, &target->obd_uuid))
453                 goto dont_check_exports;
454
455         spin_lock(&target->obd_dev_lock);
456         list_for_each(p, &target->obd_exports) {
457                 export = list_entry(p, struct obd_export, exp_obd_chain);
458                 if (obd_uuid_equals(&cluuid, &export->exp_client_uuid)) {
459                         spin_unlock(&target->obd_dev_lock);
460                         LASSERT(export->exp_obd == target);
461
462                         rc = target_handle_reconnect(&conn, export, &cluuid);
463                         break;
464                 }
465                 export = NULL;
466         }
467         /* If we found an export, we already unlocked. */
468         if (!export) {
469                 spin_unlock(&target->obd_dev_lock);
470         } else if (req->rq_reqmsg->conn_cnt == 1) {
471                 CERROR("%s reconnected with 1 conn_cnt; cookies not random?\n",
472                        cluuid.uuid);
473                 GOTO(out, rc = -EALREADY);
474         }
475
476         /* Tell the client if we're in recovery. */
477         /* If this is the first client, start the recovery timer */
478         if (target->obd_recovering) {
479                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
480                 target_start_recovery_timer(target, handler);
481         }
482
483         /* Tell the client if we support replayable requests */
484         if (target->obd_replayable)
485                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
486
487         if (export == NULL) {
488                 if (target->obd_recovering) {
489                         CERROR("denying connection for new client %s: "
490                                "%d clients in recovery for %lds\n", cluuid.uuid,
491                                target->obd_recoverable_clients,
492                                (target->obd_recovery_timer.expires-jiffies)/HZ);
493                         rc = -EBUSY;
494                 } else {
495  dont_check_exports:
496                         rc = obd_connect(&conn, target, &cluuid);
497                 }
498         }
499
500
501         /* If all else goes well, this is our RPC return code. */
502         req->rq_status = 0;
503
504         if (rc && rc != EALREADY)
505                 GOTO(out, rc);
506
507         req->rq_repmsg->handle = conn;
508
509         /* If the client and the server are the same node, we will already
510          * have an export that really points to the client's DLM export,
511          * because we have a shared handles table.
512          *
513          * XXX this will go away when shaver stops sending the "connect" handle
514          * in the real "remote handle" field of the request --phik 24 Apr 2003
515          */
516         if (req->rq_export != NULL)
517                 class_export_put(req->rq_export);
518
519         /* ownership of this export ref transfers to the request */
520         export = req->rq_export = class_conn2export(&conn);
521         LASSERT(export != NULL);
522
523         spin_lock_irqsave(&export->exp_lock, flags);
524         if (export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) {
525                 CERROR("%s: already connected at a higher conn_cnt: %d > %d\n",
526                        cluuid.uuid, export->exp_conn_cnt, 
527                        req->rq_reqmsg->conn_cnt);
528                 spin_unlock_irqrestore(&export->exp_lock, flags);
529                 GOTO(out, rc = -EALREADY);
530         }
531         export->exp_conn_cnt = req->rq_reqmsg->conn_cnt;
532         spin_unlock_irqrestore(&export->exp_lock, flags);
533
534         /* request from liblustre? */
535         if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_LIBCLIENT)
536                 export->exp_libclient = 1;
537
538         if (export->exp_connection != NULL)
539                 ptlrpc_put_connection(export->exp_connection);
540         export->exp_connection = ptlrpc_get_connection(&req->rq_peer,
541                                                        &remote_uuid);
542
543         if (rc == EALREADY) {
544                 /* We indicate the reconnection in a flag, not an error code. */
545                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
546                 GOTO(out, rc = 0);
547         }
548
549         if (target->obd_recovering)
550                 target->obd_connected_clients++;
551
552         memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn),
553                sizeof conn);
554
555         if (export->exp_imp_reverse != NULL)
556                 class_destroy_import(export->exp_imp_reverse);
557         revimp = export->exp_imp_reverse = class_new_import();
558         revimp->imp_connection = ptlrpc_connection_addref(export->exp_connection);
559         revimp->imp_client = &export->exp_obd->obd_ldlm_client;
560         revimp->imp_remote_handle = conn;
561         revimp->imp_obd = target;
562         revimp->imp_dlm_fake = 1;
563         revimp->imp_state = LUSTRE_IMP_FULL;
564         class_import_put(revimp);
565 out:
566         if (rc)
567                 req->rq_status = rc;
568         RETURN(rc);
569 }
570
571 int target_handle_disconnect(struct ptlrpc_request *req)
572 {
573         struct obd_export *exp;
574         int rc;
575         ENTRY;
576
577         rc = lustre_pack_reply(req, 0, NULL, NULL);
578         if (rc)
579                 RETURN(rc);
580
581         /* keep the rq_export around so we can send the reply */
582         exp = class_export_get(req->rq_export);
583         req->rq_status = obd_disconnect(exp, 0);
584         RETURN(0);
585 }
586
587 void target_destroy_export(struct obd_export *exp)
588 {
589         /* exports created from last_rcvd data, and "fake"
590            exports created by lctl don't have an import */
591         if (exp->exp_imp_reverse != NULL)
592                 class_destroy_import(exp->exp_imp_reverse);
593
594         /* We cancel locks at disconnect time, but this will catch any locks
595          * granted in a race with recovery-induced disconnect. */
596         if (exp->exp_obd->obd_namespace != NULL)
597                 ldlm_cancel_locks_for_export(exp);
598 }
599
600 /*
601  * Recovery functions
602  */
603
604
605 static void target_release_saved_req(struct ptlrpc_request *req)
606 {
607         if (req->rq_reply_state != NULL) {
608                 ptlrpc_rs_decref(req->rq_reply_state);
609                 /* req->rq_reply_state = NULL; */
610         }
611
612         class_export_put(req->rq_export);
613         OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
614         OBD_FREE(req, sizeof *req);
615 }
616
617 static void target_finish_recovery(struct obd_device *obd)
618 {
619         struct list_head *tmp, *n;
620         int rc;
621
622         CWARN("%s: sending delayed replies to recovered clients\n",
623               obd->obd_name);
624
625         ldlm_reprocess_all_ns(obd->obd_namespace);
626
627         /* when recovery finished, cleanup orphans on mds and ost */
628         if (OBT(obd) && OBP(obd, postrecov)) {
629                 rc = OBP(obd, postrecov)(obd);
630                 if (rc >= 0)
631                         CWARN("%s: all clients recovered, %d MDS "
632                               "orphans deleted\n", obd->obd_name, rc);
633                 else
634                         CERROR("postrecov failed %d\n", rc);
635         }
636
637         list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
638                 struct ptlrpc_request *req;
639                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
640                 list_del(&req->rq_list);
641                 DEBUG_REQ(D_WARNING, req, "delayed:");
642                 ptlrpc_reply(req);
643                 target_release_saved_req(req);
644         }
645         obd->obd_recovery_end = CURRENT_SECONDS;
646         return;
647 }
648
649 static void abort_recovery_queue(struct obd_device *obd)
650 {
651         struct ptlrpc_request *req;
652         struct list_head *tmp, *n;
653         int rc;
654
655         list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
656                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
657                 list_del(&req->rq_list);
658                 DEBUG_REQ(D_ERROR, req, "aborted:");
659                 req->rq_status = -ENOTCONN;
660                 req->rq_type = PTL_RPC_MSG_ERR;
661                 rc = lustre_pack_reply(req, 0, NULL, NULL);
662                 if (rc == 0) {
663                         ptlrpc_reply(req);
664                 } else {
665                         DEBUG_REQ(D_ERROR, req,
666                                   "packing failed for abort-reply; skipping");
667                 }
668                 target_release_saved_req(req);
669         }
670 }
671
672 /* Called from a cleanup function if the device is being cleaned up 
673    forcefully.  The exports should all have been disconnected already, 
674    the only thing left to do is 
675      - clear the recovery flags
676      - cancel the timer
677      - free queued requests and replies, but don't send replies
678    Because the obd_stopping flag is set, no new requests should be received.
679      
680 */
681 void target_cleanup_recovery(struct obd_device *obd)
682 {
683         struct list_head *tmp, *n;
684         struct ptlrpc_request *req;
685
686         spin_lock_bh(&obd->obd_processing_task_lock);
687         if (!obd->obd_recovering) {
688                 spin_unlock_bh(&obd->obd_processing_task_lock);
689                 EXIT;
690                 return;
691         }
692         obd->obd_recovering = obd->obd_abort_recovery = 0;
693         target_cancel_recovery_timer(obd);
694         spin_unlock_bh(&obd->obd_processing_task_lock);
695
696
697         list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
698                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
699                 list_del(&req->rq_list);
700                 target_release_saved_req(req);
701         }
702
703         list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
704                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
705                 list_del(&req->rq_list);
706                 target_release_saved_req(req);
707         }
708 }
709
710 void target_abort_recovery(void *data)
711 {
712         struct obd_device *obd = data;
713
714         spin_lock_bh(&obd->obd_processing_task_lock);
715         if (!obd->obd_recovering) {
716                 spin_unlock_bh(&obd->obd_processing_task_lock);
717                 EXIT;
718                 return;
719         }
720         obd->obd_recovering = obd->obd_abort_recovery = 0;
721         target_cancel_recovery_timer(obd);
722         spin_unlock_bh(&obd->obd_processing_task_lock);
723
724         CERROR("%s: recovery period over; disconnecting unfinished clients.\n",
725                obd->obd_name);
726         class_disconnect_stale_exports(obd, 0);
727         abort_recovery_queue(obd);
728
729         target_finish_recovery(obd);
730
731         ptlrpc_run_recovery_over_upcall(obd);
732 }
733
734 static void target_recovery_expired(unsigned long castmeharder)
735 {
736         struct obd_device *obd = (struct obd_device *)castmeharder;
737         CERROR("recovery timed out, aborting\n");
738         spin_lock_bh(&obd->obd_processing_task_lock);
739         if (obd->obd_recovering)
740                 obd->obd_abort_recovery = 1;
741         wake_up(&obd->obd_next_transno_waitq);
742         spin_unlock_bh(&obd->obd_processing_task_lock);
743 }
744
745
746 /* obd_processing_task_lock should be held */
747 void target_cancel_recovery_timer(struct obd_device *obd)
748 {
749         CDEBUG(D_HA, "%s: cancel recovery timer\n", obd->obd_name);
750         del_timer(&obd->obd_recovery_timer);
751 }
752
753 static void reset_recovery_timer(struct obd_device *obd)
754 {
755         spin_lock_bh(&obd->obd_processing_task_lock);
756         if (!obd->obd_recovering) {
757                 spin_unlock_bh(&obd->obd_processing_task_lock);
758                 return;
759         }                
760         CDEBUG(D_HA, "timer will expire in %u seconds\n",
761                OBD_RECOVERY_TIMEOUT / HZ);
762         mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
763         spin_unlock_bh(&obd->obd_processing_task_lock);
764 }
765
766
767 /* Only start it the first time called */
768 void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler)
769 {
770         spin_lock_bh(&obd->obd_processing_task_lock);
771         if (obd->obd_recovery_handler) {
772                 spin_unlock_bh(&obd->obd_processing_task_lock);
773                 return;
774         }
775         CWARN("%s: starting recovery timer (%us)\n", obd->obd_name,
776                OBD_RECOVERY_TIMEOUT / HZ);
777         obd->obd_recovery_handler = handler;
778         obd->obd_recovery_timer.function = target_recovery_expired;
779         obd->obd_recovery_timer.data = (unsigned long)obd;
780         spin_unlock_bh(&obd->obd_processing_task_lock);
781
782         reset_recovery_timer(obd);
783 }
784
785 static int check_for_next_transno(struct obd_device *obd)
786 {
787         struct ptlrpc_request *req;
788         int wake_up = 0, connected, completed, queue_len, max;
789         __u64 next_transno, req_transno;
790
791         spin_lock_bh(&obd->obd_processing_task_lock);
792         req = list_entry(obd->obd_recovery_queue.next,
793                          struct ptlrpc_request, rq_list);
794         max = obd->obd_max_recoverable_clients;
795         req_transno = req->rq_reqmsg->transno;
796         connected = obd->obd_connected_clients;
797         completed = max - obd->obd_recoverable_clients;
798         queue_len = obd->obd_requests_queued_for_recovery;
799         next_transno = obd->obd_next_recovery_transno;
800
801         CDEBUG(D_HA,"max: %d, connected: %d, completed: %d, queue_len: %d, "
802                "req_transno: "LPU64", next_transno: "LPU64"\n",
803                max, connected, completed, queue_len, req_transno, next_transno);
804         if (obd->obd_abort_recovery) {
805                 CDEBUG(D_HA, "waking for aborted recovery\n");
806                 wake_up = 1;
807         } else if (!obd->obd_recovering) {
808                 CDEBUG(D_HA, "waking for completed recovery (?)\n");
809                 wake_up = 1;
810         } else if (req_transno == next_transno) {
811                 CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno);
812                 wake_up = 1;
813         } else if (queue_len + completed == max) {
814                 CDEBUG(D_ERROR,
815                        "waking for skipped transno (skip: "LPD64
816                        ", ql: %d, comp: %d, conn: %d, next: "LPD64")\n",
817                        next_transno, queue_len, completed, max, req_transno);
818                 obd->obd_next_recovery_transno = req_transno;
819                 wake_up = 1;
820         }
821         spin_unlock_bh(&obd->obd_processing_task_lock);
822         LASSERT(req->rq_reqmsg->transno >= next_transno);
823         return wake_up;
824 }
825
826 static void process_recovery_queue(struct obd_device *obd)
827 {
828         struct ptlrpc_request *req;
829         int abort_recovery = 0;
830         struct l_wait_info lwi = { 0 };
831         ENTRY;
832
833         for (;;) {
834                 spin_lock_bh(&obd->obd_processing_task_lock);
835                 LASSERT(obd->obd_processing_task == current->pid);
836                 req = list_entry(obd->obd_recovery_queue.next,
837                                  struct ptlrpc_request, rq_list);
838
839                 if (req->rq_reqmsg->transno != obd->obd_next_recovery_transno) {
840                         spin_unlock_bh(&obd->obd_processing_task_lock);
841                         CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is "
842                                LPD64")\n",
843                                obd->obd_next_recovery_transno,
844                                req->rq_reqmsg->transno);
845                         l_wait_event(obd->obd_next_transno_waitq,
846                                      check_for_next_transno(obd), &lwi);
847                         spin_lock_bh(&obd->obd_processing_task_lock);
848                         abort_recovery = obd->obd_abort_recovery;
849                         spin_unlock_bh(&obd->obd_processing_task_lock);
850                         if (abort_recovery) {
851                                 target_abort_recovery(obd);
852                                 return;
853                         }
854                         continue;
855                 }
856                 list_del_init(&req->rq_list);
857                 obd->obd_requests_queued_for_recovery--;
858                 spin_unlock_bh(&obd->obd_processing_task_lock);
859
860                 DEBUG_REQ(D_HA, req, "processing: ");
861                 (void)obd->obd_recovery_handler(req);
862                 obd->obd_replayed_requests++;
863                 reset_recovery_timer(obd);
864                 /* bug 1580: decide how to properly sync() in recovery */
865                 //mds_fsync_super(mds->mds_sb);
866                 class_export_put(req->rq_export);
867                 if (req->rq_reply_state != NULL) {
868                         ptlrpc_rs_decref(req->rq_reply_state);
869                         /* req->rq_reply_state = NULL; */
870                 }
871                 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
872                 OBD_FREE(req, sizeof *req);
873                 spin_lock_bh(&obd->obd_processing_task_lock);
874                 obd->obd_next_recovery_transno++;
875                 if (list_empty(&obd->obd_recovery_queue)) {
876                         obd->obd_processing_task = 0;
877                         spin_unlock_bh(&obd->obd_processing_task_lock);
878                         break;
879                 }
880                 spin_unlock_bh(&obd->obd_processing_task_lock);
881         }
882         EXIT;
883 }
884
885 int target_queue_recovery_request(struct ptlrpc_request *req,
886                                   struct obd_device *obd)
887 {
888         struct list_head *tmp;
889         int inserted = 0;
890         __u64 transno = req->rq_reqmsg->transno;
891         struct ptlrpc_request *saved_req;
892         struct lustre_msg *reqmsg;
893
894         /* CAVEAT EMPTOR: The incoming request message has been swabbed
895          * (i.e. buflens etc are in my own byte order), but type-dependent
896          * buffers (eg mds_body, ost_body etc) have NOT been swabbed. */
897
898         if (!transno) {
899                 INIT_LIST_HEAD(&req->rq_list);
900                 DEBUG_REQ(D_HA, req, "not queueing");
901                 return 1;
902         }
903
904         /* XXX If I were a real man, these LBUGs would be sane cleanups. */
905         /* XXX just like the request-dup code in queue_final_reply */
906         OBD_ALLOC(saved_req, sizeof *saved_req);
907         if (!saved_req)
908                 LBUG();
909         OBD_ALLOC(reqmsg, req->rq_reqlen);
910         if (!reqmsg)
911                 LBUG();
912
913         spin_lock_bh(&obd->obd_processing_task_lock);
914
915         /* If we're processing the queue, we want don't want to queue this
916          * message.
917          *
918          * Also, if this request has a transno less than the one we're waiting
919          * for, we should process it now.  It could (and currently always will)
920          * be an open request for a descriptor that was opened some time ago.
921          *
922          * Also, a resent, replayed request that has already been
923          * handled will pass through here and be processed immediately.
924          */
925         if (obd->obd_processing_task == current->pid ||
926             transno < obd->obd_next_recovery_transno) {
927                 /* Processing the queue right now, don't re-add. */
928                 LASSERT(list_empty(&req->rq_list));
929                 spin_unlock_bh(&obd->obd_processing_task_lock);
930                 OBD_FREE(reqmsg, req->rq_reqlen);
931                 OBD_FREE(saved_req, sizeof *saved_req);
932                 return 1;
933         }
934
935         /* A resent, replayed request that is still on the queue; just drop it.
936            The queued request will handle this. */
937         if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) ==
938             (MSG_RESENT | MSG_REPLAY)) {
939                 DEBUG_REQ(D_ERROR, req, "dropping resent queued req");
940                 spin_unlock_bh(&obd->obd_processing_task_lock);
941                 OBD_FREE(reqmsg, req->rq_reqlen);
942                 OBD_FREE(saved_req, sizeof *saved_req);
943                 return 0;
944         }
945
946         memcpy(saved_req, req, sizeof *req);
947         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
948         req = saved_req;
949         req->rq_reqmsg = reqmsg;
950         class_export_get(req->rq_export);
951         INIT_LIST_HEAD(&req->rq_list);
952
953         /* XXX O(n^2) */
954         list_for_each(tmp, &obd->obd_recovery_queue) {
955                 struct ptlrpc_request *reqiter =
956                         list_entry(tmp, struct ptlrpc_request, rq_list);
957
958                 if (reqiter->rq_reqmsg->transno > transno) {
959                         list_add_tail(&req->rq_list, &reqiter->rq_list);
960                         inserted = 1;
961                         break;
962                 }
963         }
964
965         if (!inserted) {
966                 list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
967         }
968
969         obd->obd_requests_queued_for_recovery++;
970
971         if (obd->obd_processing_task != 0) {
972                 /* Someone else is processing this queue, we'll leave it to
973                  * them.
974                  */
975                 wake_up(&obd->obd_next_transno_waitq);
976                 spin_unlock_bh(&obd->obd_processing_task_lock);
977                 return 0;
978         }
979
980         /* Nobody is processing, and we know there's (at least) one to process
981          * now, so we'll do the honours.
982          */
983         obd->obd_processing_task = current->pid;
984         spin_unlock_bh(&obd->obd_processing_task_lock);
985
986         process_recovery_queue(obd);
987         return 0;
988 }
989
990 struct obd_device * target_req2obd(struct ptlrpc_request *req)
991 {
992         return req->rq_export->exp_obd;
993 }
994
995 int target_queue_final_reply(struct ptlrpc_request *req, int rc)
996 {
997         struct obd_device *obd = target_req2obd(req);
998         struct ptlrpc_request *saved_req;
999         struct lustre_msg *reqmsg;
1000         int recovery_done = 0;
1001
1002         LASSERT ((rc == 0) == (req->rq_reply_state != NULL));
1003
1004         if (rc) {
1005                 /* Just like ptlrpc_error, but without the sending. */
1006                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1007                 LASSERT(rc == 0); /* XXX handle this */
1008                 req->rq_type = PTL_RPC_MSG_ERR;
1009         }
1010
1011         LASSERT (!req->rq_reply_state->rs_difficult);
1012         LASSERT(list_empty(&req->rq_list));
1013         /* XXX a bit like the request-dup code in queue_recovery_request */
1014         OBD_ALLOC(saved_req, sizeof *saved_req);
1015         if (!saved_req)
1016                 LBUG();
1017         OBD_ALLOC(reqmsg, req->rq_reqlen);
1018         if (!reqmsg)
1019                 LBUG();
1020         memcpy(saved_req, req, sizeof *saved_req);
1021         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
1022         ptlrpc_rs_addref(req->rq_reply_state);  /* +1 ref for saved reply */
1023         req = saved_req;
1024         req->rq_reqmsg = reqmsg;
1025         class_export_get(req->rq_export);
1026         list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
1027
1028         spin_lock_bh(&obd->obd_processing_task_lock);
1029         /* only count the first "replay over" request from each
1030            export */
1031         if (req->rq_export->exp_replay_needed) {
1032                 --obd->obd_recoverable_clients;
1033                 req->rq_export->exp_replay_needed = 0;
1034         }
1035         recovery_done = (obd->obd_recoverable_clients == 0);
1036         spin_unlock_bh(&obd->obd_processing_task_lock);
1037
1038         if (recovery_done) {
1039                 spin_lock_bh(&obd->obd_processing_task_lock);
1040                 obd->obd_recovering = obd->obd_abort_recovery = 0;
1041                 target_cancel_recovery_timer(obd);
1042                 spin_unlock_bh(&obd->obd_processing_task_lock);
1043
1044                 target_finish_recovery(obd);
1045                 ptlrpc_run_recovery_over_upcall(obd);
1046         } else {
1047                 CWARN("%s: %d recoverable clients remain\n",
1048                        obd->obd_name, obd->obd_recoverable_clients);
1049                 wake_up(&obd->obd_next_transno_waitq);
1050         }
1051
1052         return 1;
1053 }
1054
1055 int
1056 target_send_reply_msg (struct ptlrpc_request *req, int rc, int fail_id)
1057 {
1058         if (OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) {
1059                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
1060                 DEBUG_REQ(D_ERROR, req, "dropping reply");
1061                 return (-ECOMM);
1062         }
1063
1064         if (rc) {
1065                 DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
1066                 req->rq_status = rc;
1067                 return (ptlrpc_error(req));
1068         } else {
1069                 DEBUG_REQ(D_NET, req, "sending reply");
1070         }
1071         
1072         return (ptlrpc_send_reply(req, 1));
1073 }
1074
1075 void 
1076 target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
1077 {
1078         int                        netrc;
1079         unsigned long              flags;
1080         struct ptlrpc_reply_state *rs;
1081         struct obd_device         *obd;
1082         struct obd_export         *exp;
1083         struct ptlrpc_srv_ni      *sni;
1084         struct ptlrpc_service     *svc;
1085
1086         sni = req->rq_rqbd->rqbd_srv_ni;
1087         svc = sni->sni_service;
1088         
1089         rs = req->rq_reply_state;
1090         if (rs == NULL || !rs->rs_difficult) {
1091                 /* no notifiers */
1092                 target_send_reply_msg (req, rc, fail_id);
1093                 return;
1094         }
1095
1096         /* must be an export if locks saved */
1097         LASSERT (req->rq_export != NULL);
1098         /* req/reply consistent */
1099         LASSERT (rs->rs_srv_ni == sni);
1100
1101         /* "fresh" reply */
1102         LASSERT (!rs->rs_scheduled);
1103         LASSERT (!rs->rs_scheduled_ever);
1104         LASSERT (!rs->rs_handled);
1105         LASSERT (!rs->rs_on_net);
1106         LASSERT (rs->rs_export == NULL);
1107         LASSERT (list_empty(&rs->rs_obd_list));
1108         LASSERT (list_empty(&rs->rs_exp_list));
1109
1110         exp = class_export_get (req->rq_export);
1111         obd = exp->exp_obd;
1112
1113         /* disable reply scheduling onto srv_reply_queue while I'm setting up */
1114         rs->rs_scheduled = 1;
1115         rs->rs_on_net    = 1;
1116         rs->rs_xid       = req->rq_xid;
1117         rs->rs_transno   = req->rq_transno;
1118         rs->rs_export    = exp;
1119         
1120         spin_lock_irqsave (&obd->obd_uncommitted_replies_lock, flags);
1121
1122         if (rs->rs_transno > obd->obd_last_committed) {
1123                 /* not committed already */ 
1124                 list_add_tail (&rs->rs_obd_list, 
1125                                &obd->obd_uncommitted_replies);
1126         }
1127
1128         spin_unlock (&obd->obd_uncommitted_replies_lock);
1129         spin_lock (&exp->exp_lock);
1130
1131         list_add_tail (&rs->rs_exp_list, &exp->exp_outstanding_replies);
1132
1133         spin_unlock_irqrestore (&exp->exp_lock, flags);
1134
1135         netrc = target_send_reply_msg (req, rc, fail_id);
1136
1137         spin_lock_irqsave (&svc->srv_lock, flags);
1138
1139         svc->srv_n_difficult_replies++;
1140
1141         if (netrc != 0) {
1142                 /* error sending: reply is off the net.  Also we need +1
1143                  * reply ref until ptlrpc_server_handle_reply() is done
1144                  * with the reply state (if the send was successful, there
1145                  * would have been +1 ref for the net, which
1146                  * reply_out_callback leaves alone) */
1147                 rs->rs_on_net = 0;
1148                 ptlrpc_rs_addref(rs);
1149                 atomic_inc (&svc->srv_outstanding_replies);
1150         }
1151
1152         if (!rs->rs_on_net ||                   /* some notifier */
1153             list_empty(&rs->rs_exp_list) ||     /* completed already */
1154             list_empty(&rs->rs_obd_list)) {
1155                 list_add_tail (&rs->rs_list, &svc->srv_reply_queue);
1156                 wake_up (&svc->srv_waitq);
1157         } else {
1158                 list_add (&rs->rs_list, &sni->sni_active_replies);
1159                 rs->rs_scheduled = 0;           /* allow notifier to schedule */
1160         }
1161
1162         spin_unlock_irqrestore (&svc->srv_lock, flags);
1163 }
1164
1165 int target_handle_ping(struct ptlrpc_request *req)
1166 {
1167         return lustre_pack_reply(req, 0, NULL, NULL);
1168 }
1169
1170 void target_committed_to_req(struct ptlrpc_request *req)
1171 {
1172         struct obd_device *obd = req->rq_export->exp_obd;
1173
1174         if (!obd->obd_no_transno)
1175                 req->rq_repmsg->last_committed = obd->obd_last_committed;
1176         else
1177                 DEBUG_REQ(D_IOCTL, req,
1178                           "not sending last_committed update");
1179
1180         CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
1181                obd->obd_last_committed, req->rq_xid);
1182 }
1183 EXPORT_SYMBOL(target_committed_to_req);