Whamcloud - gitweb
mgs_handler mgs_fs_setup
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LDLM
26
27 #ifdef __KERNEL__
28 # include <linux/module.h>
29 #else
30 # include <liblustre.h>
31 #endif
32 #include <linux/obd.h>
33 #include <linux/obd_ost.h> /* for LUSTRE_OSC_NAME */
34 #include <linux/lustre_mds.h> /* for LUSTRE_MDC_NAME */
35 #include <linux/lustre_mgmt.h>
36 #include <linux/lustre_dlm.h>
37 #include <linux/lustre_net.h>
38
39 /* @priority: if non-zero, move the selected to the list head
40  * @create: if zero, only search in existed connections
41  */
42 static int import_set_conn(struct obd_import *imp, struct obd_uuid *uuid,
43                            int priority, int create)
44 {
45         struct ptlrpc_connection *ptlrpc_conn;
46         struct obd_import_conn *imp_conn = NULL, *item;
47         int rc = 0;
48         ENTRY;
49
50         if (!create && !priority) {
51                 CDEBUG(D_HA, "Nothing to do\n");
52                 RETURN(-EINVAL);
53         }
54
55         ptlrpc_conn = ptlrpc_uuid_to_connection(uuid);
56         if (!ptlrpc_conn) {
57                 CDEBUG(D_HA, "can't find connection %s\n", uuid->uuid);
58                 RETURN (-ENOENT);
59         }
60
61         if (create) {
62                 OBD_ALLOC(imp_conn, sizeof(*imp_conn));
63                 if (!imp_conn) {
64                         GOTO(out_put, rc = -ENOMEM);
65                 }
66         }
67
68         spin_lock(&imp->imp_lock);
69         list_for_each_entry(item, &imp->imp_conn_list, oic_item) {
70                 if (obd_uuid_equals(uuid, &item->oic_uuid)) {
71                         if (priority) {
72                                 list_del(&item->oic_item);
73                                 list_add(&item->oic_item, &imp->imp_conn_list);
74                         }
75                         CDEBUG(D_HA, "imp %p@%s: found existing conn %s%s\n",
76                                imp, imp->imp_obd->obd_name, uuid->uuid,
77                                (priority ? ", moved to head" : ""));
78                         spin_unlock(&imp->imp_lock);
79                         GOTO(out_free, rc = 0);
80                 }
81         }
82         /* not found */
83         if (create) {
84                 imp_conn->oic_conn = ptlrpc_conn;
85                 imp_conn->oic_uuid = *uuid;
86                 if (priority)
87                         list_add(&imp_conn->oic_item, &imp->imp_conn_list);
88                 else
89                         list_add_tail(&imp_conn->oic_item, &imp->imp_conn_list);
90                 CDEBUG(D_HA, "imp %p@%s: add connection %s at %s\n",
91                        imp, imp->imp_obd->obd_name, uuid->uuid,
92                        (priority ? "head" : "tail"));
93         } else {
94                 spin_unlock(&imp->imp_lock);
95                 GOTO(out_free, rc = -ENOENT);
96                 
97         }
98
99         spin_unlock(&imp->imp_lock);
100         RETURN(0);
101 out_free:
102         if (imp_conn)
103                 OBD_FREE(imp_conn, sizeof(*imp_conn));
104 out_put:
105         ptlrpc_put_connection(ptlrpc_conn);
106         RETURN(rc);
107 }
108
109 int import_set_conn_priority(struct obd_import *imp, struct obd_uuid *uuid)
110 {
111         return import_set_conn(imp, uuid, 1, 0);
112 }
113
114 int client_import_add_conn(struct obd_import *imp, struct obd_uuid *uuid,
115                            int priority)
116 {
117         return import_set_conn(imp, uuid, priority, 1);
118 }
119
120 int client_import_del_conn(struct obd_import *imp, struct obd_uuid *uuid)
121 {
122         struct obd_import_conn *imp_conn;
123         struct obd_import_conn *cur_conn;
124         struct obd_export *dlmexp;
125         int rc = -ENOENT;
126         ENTRY;
127
128         spin_lock(&imp->imp_lock);
129         if (list_empty(&imp->imp_conn_list)) {
130                 LASSERT(!imp->imp_connection);
131                 GOTO(out, rc);
132         }
133
134         list_for_each_entry(imp_conn, &imp->imp_conn_list, oic_item) {
135                 if (!obd_uuid_equals(uuid, &imp_conn->oic_uuid))
136                         continue;
137                 LASSERT(imp_conn->oic_conn);
138
139                 cur_conn = list_entry(imp->imp_conn_list.next,
140                                       struct obd_import_conn,
141                                       oic_item);
142
143                 /* is current conn? */
144                 if (imp_conn == cur_conn) {
145                         LASSERT(imp_conn->oic_conn == imp->imp_connection);
146
147                         if (imp->imp_state != LUSTRE_IMP_CLOSED &&
148                             imp->imp_state != LUSTRE_IMP_DISCON) {
149                                 CERROR("can't remove current connection\n");
150                                 GOTO(out, rc = -EBUSY);
151                         }
152
153                         ptlrpc_put_connection(imp->imp_connection);
154                         imp->imp_connection = NULL;
155
156                         dlmexp = class_conn2export(&imp->imp_dlm_handle);
157                         if (dlmexp && dlmexp->exp_connection) {
158                                 LASSERT(dlmexp->exp_connection ==
159                                         imp_conn->oic_conn);
160                                 ptlrpc_put_connection(dlmexp->exp_connection);
161                                 dlmexp->exp_connection = NULL;
162                         }
163                 }
164
165                 list_del(&imp_conn->oic_item);
166                 ptlrpc_put_connection(imp_conn->oic_conn);
167                 OBD_FREE(imp_conn, sizeof(*imp_conn));
168                 CDEBUG(D_HA, "imp %p@%s: remove connection %s\n",
169                        imp, imp->imp_obd->obd_name, uuid->uuid);
170                 rc = 0;
171                 break;
172         }
173 out:
174         spin_unlock(&imp->imp_lock);
175         if (rc == -ENOENT)
176                 CERROR("connection %s not found\n", uuid->uuid);
177         RETURN(rc);
178 }
179
180 /*mgc_obd_setup for mount-conf*/
181 int mgc_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
182 {
183         struct lustre_cfg* lcfg = buf;
184         struct mgc_obd *mgc = &obddev->u.mgc;
185         struct obd_import *imp;
186         struct obd_uuid server_uuid;
187         int rq_portal, rp_portal, connect_op;
188         char *name = obddev->obd_type->typ_name;
189         int rc;
190         ENTRY;
191
192         if (strcmp(name, LUSTRE_MGC_NAME) == 0) {
193                 rq_portal = MGS_REQUEST_PORTAL;
194                 rp_portal = MGC_REPLY_PORTAL;
195                 connect_op = MGS_CONNECT;
196         } else {
197                 CERROR("wrong client OBD type \"%s\", can't setup\n",
198                        name);
199                 RETURN(-EINVAL);
200         }
201
202         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
203                 CERROR("requires a TARGET UUID\n");
204                 RETURN(-EINVAL);
205         }
206
207         if (LUSTRE_CFG_BUFLEN(lcfg, 1) > 37) {
208                 CERROR("client UUID must be less than 38 characters\n");
209                 RETURN(-EINVAL);
210         }
211
212         if (LUSTRE_CFG_BUFLEN(lcfg, 2) < 1) {
213                 CERROR("setup requires a SERVER UUID\n");
214                 RETURN(-EINVAL);
215         }
216
217         if (LUSTRE_CFG_BUFLEN(lcfg, 2) > 37) {
218                 CERROR("target UUID must be less than 38 characters\n");
219                 RETURN(-EINVAL);
220         }
221
222         sema_init(&mgc->cl_sem, 1);
223         mgc->cl_conn_count = 0;
224         memcpy(server_uuid.uuid, lustre_cfg_buf(lcfg, 2),
225                min_t(unsigned int, LUSTRE_CFG_BUFLEN(lcfg, 2),
226                      sizeof(server_uuid)));
227
228         rc = ldlm_get_ref();
229         if (rc) {
230                 CERROR("ldlm_get_ref failed: %d\n", rc);
231                 GOTO(err, rc);
232         }
233
234         ptlrpc_init_client(rq_portal, rp_portal, name,
235                            &obddev->obd_ldlm_client);
236
237         imp = class_new_import();
238         if (imp == NULL)
239                 GOTO(err_ldlm, rc = -ENOENT);
240         imp->imp_client = &obddev->obd_ldlm_client;
241         imp->imp_obd = obddev;
242         imp->imp_connect_op = connect_op;
243         imp->imp_generation = 0;
244         imp->imp_initial_recov = 1;
245         INIT_LIST_HEAD(&imp->imp_pinger_chain);
246         memcpy(imp->imp_target_uuid.uuid, lustre_cfg_buf(lcfg, 1),
247                LUSTRE_CFG_BUFLEN(lcfg, 1));
248         class_import_put(imp);
249
250         rc = client_import_add_conn(imp, &server_uuid, 1);
251         if (rc) {
252                 CERROR("can't add initial connection\n");
253                 GOTO(err_import, rc);
254         }
255
256         mgc->cl_import = imp;
257
258         RETURN(rc);
259
260 err_import:
261         class_destroy_import(imp);
262 err_ldlm:
263         ldlm_put_ref(0);
264 err:
265         RETURN(rc);
266 }
267
268 /*mgc_obd_cleaup for mount-conf*/
269 int mgc_obd_cleanup(struct obd_device *obddev)
270 {
271         struct mgc_obd *mgc = &obddev->u.mgc;
272
273         if (!mgc->cl_import)
274                 RETURN(-EINVAL);
275
276         class_destroy_import(mgc->cl_import);
277         mgc->cl_import = NULL;
278
279         ldlm_put_ref(obddev->obd_force);
280
281         RETURN(0);
282 }
283
284 /* mgc_connect_import for mount-conf*/
285 int mgc_connect_import(struct lustre_handle *dlm_handle,
286                        struct obd_device *obd, struct obd_uuid *cluuid,
287                        struct obd_connect_data *data)
288 {
289         struct mgc_obd *cli = &obd->u.mgc;
290         struct obd_import *imp = mgc->cl_import;
291         struct obd_export *exp;
292         int rc;
293         ENTRY;
294
295         down(&mgc->cl_sem);
296         rc = class_connect(dlm_handle, obd, cluuid);
297         if (rc)
298                 GOTO(out_sem, rc);
299
300         mgc->cl_conn_count++;
301         if (mgc->cl_conn_count > 1)
302                 GOTO(out_sem, rc);
303         exp = class_conn2export(dlm_handle);
304
305         imp->imp_dlm_handle = *dlm_handle;
306         rc = ptlrpc_init_import(imp);
307         if (rc != 0) 
308                 GOTO(out_disco, rc);
309
310         if (data)
311                 memcpy(&imp->imp_connect_data, data, sizeof(*data));
312         rc = ptlrpc_connect_import(imp, NULL);
313         if (rc != 0) {
314                 LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
315                 GOTO(out_disco, rc);
316         }
317         LASSERT(exp->exp_connection);
318
319         ptlrpc_pinger_add_import(imp);
320         EXIT;
321
322         if (rc) {
323 out_disco:
324                 mgc->cl_conn_count--;
325                 class_disconnect(exp);
326         } else {
327                 class_export_put(exp);
328         }
329 out_sem:
330         up(&mgc->cl_sem);
331         return rc;
332 }
333
334 /* mgc_disconnect_export for mount-conf*/
335 int mgc_disconnect_export(struct obd_export *exp)
336 {
337         struct obd_device *obd = class_exp2obd(exp);
338         struct mgc_obd *mgc = &obd->u.mgc;
339         struct obd_import *imp = mgc->cl_import;
340         int rc = 0, err;
341         ENTRY;
342
343         if (!obd) {
344                 CERROR("invalid export for disconnect: exp %p cookie "LPX64"\n",
345                        exp, exp ? exp->exp_handle.h_cookie : -1);
346                 RETURN(-EINVAL);
347         }
348
349         down(&mgc->cl_sem);
350         if (!mgc->cl_conn_count) {
351                 CERROR("disconnecting disconnected device (%s)\n",
352                        obd->obd_name);
353                 GOTO(out_sem, rc = -EINVAL);
354         }
355
356         mgc->cl_conn_count--;
357         if (mgc->cl_conn_count)
358                 GOTO(out_no_disconnect, rc = 0);
359
360         /* Some non-replayable imports (MDS's OSCs) are pinged, so just
361          * delete it regardless.  (It's safe to delete an import that was
362          * never added.) */
363         (void)ptlrpc_pinger_del_import(imp);
364
365         /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
366         if (obd->obd_no_recov)
367                 ptlrpc_invalidate_import(imp);
368         else
369                 rc = ptlrpc_disconnect_import(imp);
370
371         EXIT;
372  out_no_disconnect:
373         err = class_disconnect(exp);
374         if (!rc && err)
375                 rc = err;
376  out_sem:
377         up(&mgc->cl_sem);
378         RETURN(rc);
379 }
380
381 int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
382 {
383         struct lustre_cfg* lcfg = buf;
384         struct client_obd *cli = &obddev->u.cli;
385         struct obd_import *imp;
386         struct obd_uuid server_uuid;
387         int rq_portal, rp_portal, connect_op;
388         char *name = obddev->obd_type->typ_name;
389         char *mgmt_name = NULL;
390         int rc;
391         struct obd_device *mgmt_obd;
392         mgmtcli_register_for_events_t register_f;
393         ENTRY;
394
395         /* In a more perfect world, we would hang a ptlrpc_client off of
396          * obd_type and just use the values from there. */
397         if (!strcmp(name, LUSTRE_OSC_NAME)) {
398                 rq_portal = OST_REQUEST_PORTAL;
399                 rp_portal = OSC_REPLY_PORTAL;
400                 connect_op = OST_CONNECT;
401         } else if (!strcmp(name, LUSTRE_MDC_NAME)) {
402                 rq_portal = MDS_REQUEST_PORTAL;
403                 rp_portal = MDC_REPLY_PORTAL;
404                 connect_op = MDS_CONNECT;
405         } else if (!strcmp(name, LUSTRE_MGMTCLI_NAME)) {
406                 rq_portal = MGMT_REQUEST_PORTAL;
407                 rp_portal = MGMT_REPLY_PORTAL;
408                 connect_op = MGMT_CONNECT;
409         } else {
410                 CERROR("unknown client OBD type \"%s\", can't setup\n",
411                        name);
412                 RETURN(-EINVAL);
413         }
414
415         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
416                 CERROR("requires a TARGET UUID\n");
417                 RETURN(-EINVAL);
418         }
419
420         if (LUSTRE_CFG_BUFLEN(lcfg, 1) > 37) {
421                 CERROR("client UUID must be less than 38 characters\n");
422                 RETURN(-EINVAL);
423         }
424
425         if (LUSTRE_CFG_BUFLEN(lcfg, 2) < 1) {
426                 CERROR("setup requires a SERVER UUID\n");
427                 RETURN(-EINVAL);
428         }
429
430         if (LUSTRE_CFG_BUFLEN(lcfg, 2) > 37) {
431                 CERROR("target UUID must be less than 38 characters\n");
432                 RETURN(-EINVAL);
433         }
434
435         sema_init(&cli->cl_sem, 1);
436         cli->cl_conn_count = 0;
437         memcpy(server_uuid.uuid, lustre_cfg_buf(lcfg, 2),
438                min_t(unsigned int, LUSTRE_CFG_BUFLEN(lcfg, 2),
439                      sizeof(server_uuid)));
440
441         cli->cl_dirty = 0;
442         cli->cl_avail_grant = 0;
443         /* FIXME: should limit this for the sum of all cl_dirty_max */
444         cli->cl_dirty_max = OSC_MAX_DIRTY_DEFAULT * 1024 * 1024;
445         if (cli->cl_dirty_max >> PAGE_SHIFT > num_physpages / 8)
446                 cli->cl_dirty_max = num_physpages << (PAGE_SHIFT - 3);
447         INIT_LIST_HEAD(&cli->cl_cache_waiters);
448         INIT_LIST_HEAD(&cli->cl_loi_ready_list);
449         INIT_LIST_HEAD(&cli->cl_loi_write_list);
450         INIT_LIST_HEAD(&cli->cl_loi_read_list);
451         spin_lock_init(&cli->cl_loi_list_lock);
452         cli->cl_r_in_flight = 0;
453         cli->cl_w_in_flight = 0;
454         spin_lock_init(&cli->cl_read_rpc_hist.oh_lock);
455         spin_lock_init(&cli->cl_write_rpc_hist.oh_lock);
456         spin_lock_init(&cli->cl_read_page_hist.oh_lock);
457         spin_lock_init(&cli->cl_write_page_hist.oh_lock);
458         spin_lock_init(&cli->cl_read_offset_hist.oh_lock);
459         spin_lock_init(&cli->cl_write_offset_hist.oh_lock);
460         if (num_physpages >> (20 - PAGE_SHIFT) <= 128) { /* <= 128 MB */
461                 cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES / 4;
462                 cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT / 4;
463         } else if (num_physpages >> (20 - PAGE_SHIFT) <= 512) { /* <= 512 MB */
464                 cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES / 2;
465                 cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT / 2;
466         } else {
467                 cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES;
468                 cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
469         }
470
471         rc = ldlm_get_ref();
472         if (rc) {
473                 CERROR("ldlm_get_ref failed: %d\n", rc);
474                 GOTO(err, rc);
475         }
476
477         ptlrpc_init_client(rq_portal, rp_portal, name,
478                            &obddev->obd_ldlm_client);
479
480         imp = class_new_import();
481         if (imp == NULL)
482                 GOTO(err_ldlm, rc = -ENOENT);
483         imp->imp_client = &obddev->obd_ldlm_client;
484         imp->imp_obd = obddev;
485         imp->imp_connect_op = connect_op;
486         imp->imp_generation = 0;
487         imp->imp_initial_recov = 1;
488         INIT_LIST_HEAD(&imp->imp_pinger_chain);
489         memcpy(imp->imp_target_uuid.uuid, lustre_cfg_buf(lcfg, 1),
490                LUSTRE_CFG_BUFLEN(lcfg, 1));
491         class_import_put(imp);
492
493         rc = client_import_add_conn(imp, &server_uuid, 1);
494         if (rc) {
495                 CERROR("can't add initial connection\n");
496                 GOTO(err_import, rc);
497         }
498
499         cli->cl_import = imp;
500         /* cli->cl_max_mds_{easize,cookiesize} updated by mdc_init_ea_size() */
501         cli->cl_max_mds_easize = sizeof(struct lov_mds_md);
502         cli->cl_max_mds_cookiesize = sizeof(struct llog_cookie);
503         cli->cl_sandev = to_kdev_t(0);
504
505         if (LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
506                 if (!strcmp(lustre_cfg_string(lcfg, 3), "inactive")) {
507                         CDEBUG(D_HA, "marking %s %s->%s as inactive\n",
508                                name, obddev->obd_name,
509                                imp->imp_target_uuid.uuid);
510                         imp->imp_invalid = 1;
511
512                         if (LUSTRE_CFG_BUFLEN(lcfg, 4) > 0)
513                                 mgmt_name = lustre_cfg_string(lcfg, 4);
514                 } else {
515                         mgmt_name = lustre_cfg_string(lcfg, 3);
516                 }
517         }
518
519         if (mgmt_name != NULL) {
520                 /* Register with management client if we need to. */
521                 CDEBUG(D_HA, "%s registering with %s for events about %s\n",
522                        obddev->obd_name, mgmt_name, server_uuid.uuid);
523
524                 mgmt_obd = class_name2obd(mgmt_name);
525                 if (!mgmt_obd) {
526                         CERROR("can't find mgmtcli %s to register\n",
527                                mgmt_name);
528                         GOTO(err_import, rc = -ENOSYS);
529                 }
530
531                 register_f = inter_module_get("mgmtcli_register_for_events");
532                 if (!register_f) {
533                         CERROR("can't i_m_g mgmtcli_register_for_events\n");
534                         GOTO(err_import, rc = -ENOSYS);
535                 }
536
537                 rc = register_f(mgmt_obd, obddev, &imp->imp_target_uuid);
538                 inter_module_put("mgmtcli_register_for_events");
539
540                 if (!rc)
541                         cli->cl_mgmtcli_obd = mgmt_obd;
542         }
543
544         spin_lock_init(&cli->cl_qchk_lock);
545         cli->cl_qchk_stat = CL_NO_QUOTACHECK;
546
547         RETURN(rc);
548
549 err_import:
550         class_destroy_import(imp);
551 err_ldlm:
552         ldlm_put_ref(0);
553 err:
554         RETURN(rc);
555
556 }
557
558 int client_obd_cleanup(struct obd_device *obddev)
559 {
560         struct client_obd *cli = &obddev->u.cli;
561
562         if (!cli->cl_import)
563                 RETURN(-EINVAL);
564         if (cli->cl_mgmtcli_obd) {
565                 mgmtcli_deregister_for_events_t dereg_f;
566
567                 dereg_f = inter_module_get("mgmtcli_deregister_for_events");
568                 dereg_f(cli->cl_mgmtcli_obd, obddev);
569                 inter_module_put("mgmtcli_deregister_for_events");
570         }
571         class_destroy_import(cli->cl_import);
572         cli->cl_import = NULL;
573
574         ldlm_put_ref(obddev->obd_force);
575
576         RETURN(0);
577 }
578
579 int client_connect_import(struct lustre_handle *dlm_handle,
580                           struct obd_device *obd, struct obd_uuid *cluuid,
581                           struct obd_connect_data *data)
582 {
583         struct client_obd *cli = &obd->u.cli;
584         struct obd_import *imp = cli->cl_import;
585         struct obd_export *exp;
586         int rc;
587         ENTRY;
588
589         down(&cli->cl_sem);
590         rc = class_connect(dlm_handle, obd, cluuid);
591         if (rc)
592                 GOTO(out_sem, rc);
593
594         cli->cl_conn_count++;
595         if (cli->cl_conn_count > 1)
596                 GOTO(out_sem, rc);
597         exp = class_conn2export(dlm_handle);
598
599         if (obd->obd_namespace != NULL)
600                 CERROR("already have namespace!\n");
601         obd->obd_namespace = ldlm_namespace_new(obd->obd_name,
602                                                 LDLM_NAMESPACE_CLIENT);
603         if (obd->obd_namespace == NULL)
604                 GOTO(out_disco, rc = -ENOMEM);
605
606         imp->imp_dlm_handle = *dlm_handle;
607         rc = ptlrpc_init_import(imp);
608         if (rc != 0) 
609                 GOTO(out_ldlm, rc);
610
611         if (data)
612                 memcpy(&imp->imp_connect_data, data, sizeof(*data));
613         rc = ptlrpc_connect_import(imp, NULL);
614         if (rc != 0) {
615                 LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
616                 GOTO(out_ldlm, rc);
617         }
618         LASSERT(exp->exp_connection);
619
620         ptlrpc_pinger_add_import(imp);
621         EXIT;
622
623         if (rc) {
624 out_ldlm:
625                 ldlm_namespace_free(obd->obd_namespace, 0);
626                 obd->obd_namespace = NULL;
627 out_disco:
628                 cli->cl_conn_count--;
629                 class_disconnect(exp);
630         } else {
631                 class_export_put(exp);
632         }
633 out_sem:
634         up(&cli->cl_sem);
635         return rc;
636 }
637
638 int client_disconnect_export(struct obd_export *exp)
639 {
640         struct obd_device *obd = class_exp2obd(exp);
641         struct client_obd *cli = &obd->u.cli;
642         struct obd_import *imp = cli->cl_import;
643         int rc = 0, err;
644         ENTRY;
645
646         if (!obd) {
647                 CERROR("invalid export for disconnect: exp %p cookie "LPX64"\n",
648                        exp, exp ? exp->exp_handle.h_cookie : -1);
649                 RETURN(-EINVAL);
650         }
651
652         down(&cli->cl_sem);
653         if (!cli->cl_conn_count) {
654                 CERROR("disconnecting disconnected device (%s)\n",
655                        obd->obd_name);
656                 GOTO(out_sem, rc = -EINVAL);
657         }
658
659         cli->cl_conn_count--;
660         if (cli->cl_conn_count)
661                 GOTO(out_no_disconnect, rc = 0);
662
663         /* Some non-replayable imports (MDS's OSCs) are pinged, so just
664          * delete it regardless.  (It's safe to delete an import that was
665          * never added.) */
666         (void)ptlrpc_pinger_del_import(imp);
667
668         if (obd->obd_namespace != NULL) {
669                 /* obd_no_recov == local only */
670                 ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
671                                        obd->obd_no_recov, NULL);
672                 ldlm_namespace_free(obd->obd_namespace, obd->obd_no_recov);
673                 obd->obd_namespace = NULL;
674         }
675
676         /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
677         if (obd->obd_no_recov)
678                 ptlrpc_invalidate_import(imp);
679         else
680                 rc = ptlrpc_disconnect_import(imp);
681
682         EXIT;
683  out_no_disconnect:
684         err = class_disconnect(exp);
685         if (!rc && err)
686                 rc = err;
687  out_sem:
688         up(&cli->cl_sem);
689         RETURN(rc);
690 }
691
692 /* --------------------------------------------------------------------------
693  * from old lib/target.c
694  * -------------------------------------------------------------------------- */
695
696 int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
697                             struct obd_uuid *cluuid)
698 {
699         if (exp->exp_connection) {
700                 struct lustre_handle *hdl;
701                 hdl = &exp->exp_imp_reverse->imp_remote_handle;
702                 /* Might be a re-connect after a partition. */
703                 if (!memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
704                         CERROR("%s reconnecting\n", cluuid->uuid);
705                         conn->cookie = exp->exp_handle.h_cookie;
706                         /* target_handle_connect() treats EALREADY and
707                          * -EALREADY differently */
708                         RETURN(EALREADY);
709                 } else {
710                         CERROR("%s reconnecting from %s, "
711                                "handle mismatch (ours "LPX64", theirs "
712                                LPX64")\n", cluuid->uuid,
713                                exp->exp_connection->c_remote_uuid.uuid,
714                                hdl->cookie, conn->cookie);
715                         memset(conn, 0, sizeof *conn);
716                         /* target_handle_connect() treats EALREADY and
717                          * -EALREADY differently */
718                         RETURN(-EALREADY);
719                 }
720         }
721
722         conn->cookie = exp->exp_handle.h_cookie;
723         CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n",
724                cluuid->uuid, exp);
725         CDEBUG(D_IOCTL,"connect: cookie "LPX64"\n", conn->cookie);
726         RETURN(0);
727 }
728
729 int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
730 {
731         struct obd_device *target;
732         struct obd_export *export = NULL;
733         struct obd_import *revimp;
734         struct lustre_handle conn;
735         struct obd_uuid tgtuuid;
736         struct obd_uuid cluuid;
737         struct obd_uuid remote_uuid;
738         struct list_head *p;
739         char *str, *tmp;
740         int rc = 0, abort_recovery;
741         unsigned long flags;
742         struct obd_connect_data *data;
743         int size = sizeof(*data);
744         ENTRY;
745
746         OBD_RACE(OBD_FAIL_TGT_CONN_RACE); 
747
748         LASSERT_REQSWAB (req, 0);
749         str = lustre_msg_string(req->rq_reqmsg, 0, sizeof(tgtuuid) - 1);
750         if (str == NULL) {
751                 DEBUG_REQ(D_ERROR, req, "bad target UUID for connect\n");
752                 GOTO(out, rc = -EINVAL);
753         }
754
755         obd_str2uuid (&tgtuuid, str);
756         target = class_uuid2obd(&tgtuuid);
757         if (!target) {
758                 target = class_name2obd(str);
759         }
760
761         if (!target || target->obd_stopping || !target->obd_set_up) {
762                 DEBUG_REQ(D_ERROR, req, "UUID '%s' is not available "
763                        " for connect (%s)\n", str,
764                        !target ? "no target" : 
765                        (target->obd_stopping ? "stopping" : "not set up"));
766                 GOTO(out, rc = -ENODEV);
767         }
768
769         LASSERT_REQSWAB (req, 1);
770         str = lustre_msg_string(req->rq_reqmsg, 1, sizeof(cluuid) - 1);
771         if (str == NULL) {
772                 DEBUG_REQ(D_ERROR, req, "bad client UUID for connect\n");
773                 GOTO(out, rc = -EINVAL);
774         }
775
776         obd_str2uuid (&cluuid, str);
777
778         /* XXX extract a nettype and format accordingly */
779         switch (sizeof(ptl_nid_t)) {
780                 /* NB the casts only avoid compiler warnings */
781         case 8:
782                 snprintf(remote_uuid.uuid, sizeof remote_uuid,
783                          "NET_"LPX64"_UUID", (__u64)req->rq_peer.nid);
784                 break;
785         case 4:
786                 snprintf(remote_uuid.uuid, sizeof remote_uuid,
787                          "NET_%x_UUID", (__u32)req->rq_peer.nid);
788                 break;
789         default:
790                 LBUG();
791         }
792
793         spin_lock_bh(&target->obd_processing_task_lock);
794         abort_recovery = target->obd_abort_recovery;
795         spin_unlock_bh(&target->obd_processing_task_lock);
796         if (abort_recovery)
797                 target_abort_recovery(target);
798
799         tmp = lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn);
800         if (tmp == NULL)
801                 GOTO(out, rc = -EPROTO);
802
803         memcpy(&conn, tmp, sizeof conn);
804
805         data = lustre_swab_reqbuf(req, 3, sizeof(*data), lustre_swab_connect);
806         rc = lustre_pack_reply(req, 1, &size, NULL);
807         if (rc)
808                 GOTO(out, rc);
809
810         /* lctl gets a backstage, all-access pass. */
811         if (obd_uuid_equals(&cluuid, &target->obd_uuid))
812                 goto dont_check_exports;
813
814         spin_lock(&target->obd_dev_lock);
815         list_for_each(p, &target->obd_exports) {
816                 export = list_entry(p, struct obd_export, exp_obd_chain);
817                 if (obd_uuid_equals(&cluuid, &export->exp_client_uuid)) {
818                         spin_unlock(&target->obd_dev_lock);
819                         LASSERT(export->exp_obd == target);
820
821                         rc = target_handle_reconnect(&conn, export, &cluuid);
822                         break;
823                 }
824                 export = NULL;
825         }
826         /* If we found an export, we already unlocked. */
827         if (!export) {
828                 spin_unlock(&target->obd_dev_lock);
829         } else if (req->rq_reqmsg->conn_cnt == 1) {
830                 CERROR("%s reconnected with 1 conn_cnt; cookies not random?\n",
831                        cluuid.uuid);
832                 GOTO(out, rc = -EALREADY);
833         }
834
835         /* Tell the client if we're in recovery. */
836         /* If this is the first client, start the recovery timer */
837         if (target->obd_recovering) {
838                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
839                 target_start_recovery_timer(target, handler);
840         }
841
842         /* Tell the client if we support replayable requests */
843         if (target->obd_replayable)
844                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
845
846         if (export == NULL) {
847                 if (target->obd_recovering) {
848                         CERROR("%s: denying connection for new client %s: "
849                                "%d clients in recovery for %lds\n",
850                                target->obd_name, cluuid.uuid,
851                                target->obd_recoverable_clients,
852                                (target->obd_recovery_timer.expires-jiffies)/HZ);
853                         rc = -EBUSY;
854                 } else {
855  dont_check_exports:
856                         rc = obd_connect(&conn, target, &cluuid, data);
857                 }
858         }
859
860         /* Return only the parts of obd_connect_data that we understand, so the
861          * client knows that we don't understand the rest. */
862         if (data)
863                 memcpy(lustre_msg_buf(req->rq_repmsg, 0, sizeof(*data)), data,
864                        sizeof(*data));
865
866         /* If all else goes well, this is our RPC return code. */
867         req->rq_status = 0;
868
869         /* we want to handle EALREADY but *not* -EALREADY from
870          * target_handle_reconnect() */
871         if (rc && rc != EALREADY)
872                 GOTO(out, rc);
873
874         req->rq_repmsg->handle = conn;
875
876         /* If the client and the server are the same node, we will already
877          * have an export that really points to the client's DLM export,
878          * because we have a shared handles table.
879          *
880          * XXX this will go away when shaver stops sending the "connect" handle
881          * in the real "remote handle" field of the request --phik 24 Apr 2003
882          */
883         if (req->rq_export != NULL)
884                 class_export_put(req->rq_export);
885
886         /* ownership of this export ref transfers to the request */
887         export = req->rq_export = class_conn2export(&conn);
888         LASSERT(export != NULL);
889
890         spin_lock_irqsave(&export->exp_lock, flags);
891         if (export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) {
892                 CERROR("%s: already connected at a higher conn_cnt: %d > %d\n",
893                        cluuid.uuid, export->exp_conn_cnt, 
894                        req->rq_reqmsg->conn_cnt);
895                 spin_unlock_irqrestore(&export->exp_lock, flags);
896                 GOTO(out, rc = -EALREADY);
897         }
898         export->exp_conn_cnt = req->rq_reqmsg->conn_cnt;
899         spin_unlock_irqrestore(&export->exp_lock, flags);
900
901         /* request from liblustre? */
902         if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_LIBCLIENT)
903                 export->exp_libclient = 1;
904
905         if (export->exp_connection != NULL)
906                 ptlrpc_put_connection(export->exp_connection);
907         export->exp_connection = ptlrpc_get_connection(req->rq_peer,
908                                                        &remote_uuid);
909
910         if (rc == EALREADY) {
911                 /* We indicate the reconnection in a flag, not an error code. */
912                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
913                 GOTO(out, rc = 0);
914         }
915
916         if (target->obd_recovering)
917                 target->obd_connected_clients++;
918
919         memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn),
920                sizeof conn);
921
922         if (export->exp_imp_reverse != NULL)
923                 class_destroy_import(export->exp_imp_reverse);
924         revimp = export->exp_imp_reverse = class_new_import();
925         revimp->imp_connection = ptlrpc_connection_addref(export->exp_connection);
926         revimp->imp_client = &export->exp_obd->obd_ldlm_client;
927         revimp->imp_remote_handle = conn;
928         revimp->imp_obd = target;
929         revimp->imp_dlm_fake = 1;
930         revimp->imp_state = LUSTRE_IMP_FULL;
931         class_import_put(revimp);
932 out:
933         if (rc)
934                 req->rq_status = rc;
935         RETURN(rc);
936 }
937
938 int target_handle_disconnect(struct ptlrpc_request *req)
939 {
940         struct obd_export *exp;
941         int rc;
942         ENTRY;
943
944         rc = lustre_pack_reply(req, 0, NULL, NULL);
945         if (rc)
946                 RETURN(rc);
947
948         /* keep the rq_export around so we can send the reply */
949         exp = class_export_get(req->rq_export);
950         req->rq_status = obd_disconnect(exp);
951         RETURN(0);
952 }
953
954 void target_destroy_export(struct obd_export *exp)
955 {
956         /* exports created from last_rcvd data, and "fake"
957            exports created by lctl don't have an import */
958         if (exp->exp_imp_reverse != NULL)
959                 class_destroy_import(exp->exp_imp_reverse);
960
961         /* We cancel locks at disconnect time, but this will catch any locks
962          * granted in a race with recovery-induced disconnect. */
963         if (exp->exp_obd->obd_namespace != NULL)
964                 ldlm_cancel_locks_for_export(exp);
965 }
966
967 /*
968  * Recovery functions
969  */
970
971
972 static void target_release_saved_req(struct ptlrpc_request *req)
973 {
974         if (req->rq_reply_state != NULL) {
975                 ptlrpc_rs_decref(req->rq_reply_state);
976                 /* req->rq_reply_state = NULL; */
977         }
978
979         class_export_put(req->rq_export);
980         OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
981         OBD_FREE(req, sizeof *req);
982 }
983
984 static void target_finish_recovery(struct obd_device *obd)
985 {
986         struct list_head *tmp, *n;
987         int rc;
988
989         CWARN("%s: sending delayed replies to recovered clients\n",
990               obd->obd_name);
991
992         ldlm_reprocess_all_ns(obd->obd_namespace);
993
994         /* when recovery finished, cleanup orphans on mds and ost */
995         if (OBT(obd) && OBP(obd, postrecov)) {
996                 rc = OBP(obd, postrecov)(obd);
997                 if (rc >= 0)
998                         CWARN("%s: all clients recovered, %d MDS "
999                               "orphans deleted\n", obd->obd_name, rc);
1000                 else
1001                         CERROR("postrecov failed %d\n", rc);
1002         }
1003
1004         list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
1005                 struct ptlrpc_request *req;
1006                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
1007                 list_del(&req->rq_list);
1008                 DEBUG_REQ(D_WARNING, req, "delayed:");
1009                 ptlrpc_reply(req);
1010                 target_release_saved_req(req);
1011         }
1012         obd->obd_recovery_end = CURRENT_SECONDS;
1013         return;
1014 }
1015
1016 static void abort_recovery_queue(struct obd_device *obd)
1017 {
1018         struct ptlrpc_request *req;
1019         struct list_head *tmp, *n;
1020         int rc;
1021
1022         list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
1023                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
1024                 list_del(&req->rq_list);
1025                 DEBUG_REQ(D_ERROR, req, "aborted:");
1026                 req->rq_status = -ENOTCONN;
1027                 req->rq_type = PTL_RPC_MSG_ERR;
1028                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1029                 if (rc == 0) {
1030                         ptlrpc_reply(req);
1031                 } else {
1032                         DEBUG_REQ(D_ERROR, req,
1033                                   "packing failed for abort-reply; skipping");
1034                 }
1035                 target_release_saved_req(req);
1036         }
1037 }
1038
1039 /* Called from a cleanup function if the device is being cleaned up 
1040    forcefully.  The exports should all have been disconnected already, 
1041    the only thing left to do is 
1042      - clear the recovery flags
1043      - cancel the timer
1044      - free queued requests and replies, but don't send replies
1045    Because the obd_stopping flag is set, no new requests should be received.
1046      
1047 */
1048 void target_cleanup_recovery(struct obd_device *obd)
1049 {
1050         struct list_head *tmp, *n;
1051         struct ptlrpc_request *req;
1052         ENTRY;
1053
1054         LASSERT(obd->obd_stopping);
1055
1056         spin_lock_bh(&obd->obd_processing_task_lock);
1057         if (!obd->obd_recovering) {
1058                 spin_unlock_bh(&obd->obd_processing_task_lock);
1059                 EXIT;
1060                 return;
1061         }
1062         obd->obd_recovering = obd->obd_abort_recovery = 0;
1063         target_cancel_recovery_timer(obd);
1064         spin_unlock_bh(&obd->obd_processing_task_lock);
1065
1066         list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
1067                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
1068                 list_del(&req->rq_list);
1069                 target_release_saved_req(req);
1070         }
1071
1072         list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
1073                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
1074                 list_del(&req->rq_list);
1075                 target_release_saved_req(req);
1076         }
1077         EXIT;
1078 }
1079
1080 void target_abort_recovery(void *data)
1081 {
1082         struct obd_device *obd = data;
1083
1084         spin_lock_bh(&obd->obd_processing_task_lock);
1085         if (!obd->obd_recovering) {
1086                 spin_unlock_bh(&obd->obd_processing_task_lock);
1087                 EXIT;
1088                 return;
1089         }
1090         obd->obd_recovering = obd->obd_abort_recovery = 0;
1091         obd->obd_recoverable_clients = 0;
1092         target_cancel_recovery_timer(obd);
1093         spin_unlock_bh(&obd->obd_processing_task_lock);
1094
1095         CERROR("%s: recovery period over; disconnecting unfinished clients.\n",
1096                obd->obd_name);
1097         class_disconnect_stale_exports(obd);
1098         abort_recovery_queue(obd);
1099
1100         target_finish_recovery(obd);
1101
1102         ptlrpc_run_recovery_over_upcall(obd);
1103 }
1104
1105 static void target_recovery_expired(unsigned long castmeharder)
1106 {
1107         struct obd_device *obd = (struct obd_device *)castmeharder;
1108         CERROR("%s: recovery timed out, aborting\n", obd->obd_name);
1109         spin_lock_bh(&obd->obd_processing_task_lock);
1110         if (obd->obd_recovering)
1111                 obd->obd_abort_recovery = 1;
1112         wake_up(&obd->obd_next_transno_waitq);
1113         spin_unlock_bh(&obd->obd_processing_task_lock);
1114 }
1115
1116
1117 /* obd_processing_task_lock should be held */
1118 void target_cancel_recovery_timer(struct obd_device *obd)
1119 {
1120         CDEBUG(D_HA, "%s: cancel recovery timer\n", obd->obd_name);
1121         del_timer(&obd->obd_recovery_timer);
1122 }
1123
1124 static void reset_recovery_timer(struct obd_device *obd)
1125 {
1126         spin_lock_bh(&obd->obd_processing_task_lock);
1127         if (!obd->obd_recovering) {
1128                 spin_unlock_bh(&obd->obd_processing_task_lock);
1129                 return;
1130         }
1131         CDEBUG(D_HA, "%s: timer will expire in %u seconds\n", obd->obd_name,
1132                (int)(OBD_RECOVERY_TIMEOUT / HZ));
1133         mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
1134         spin_unlock_bh(&obd->obd_processing_task_lock);
1135 }
1136
1137
1138 /* Only start it the first time called */
1139 void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler)
1140 {
1141         spin_lock_bh(&obd->obd_processing_task_lock);
1142         if (obd->obd_recovery_handler) {
1143                 spin_unlock_bh(&obd->obd_processing_task_lock);
1144                 return;
1145         }
1146         CWARN("%s: starting recovery timer (%us)\n", obd->obd_name,
1147               (int)(OBD_RECOVERY_TIMEOUT / HZ));
1148         obd->obd_recovery_handler = handler;
1149         obd->obd_recovery_timer.function = target_recovery_expired;
1150         obd->obd_recovery_timer.data = (unsigned long)obd;
1151         spin_unlock_bh(&obd->obd_processing_task_lock);
1152
1153         reset_recovery_timer(obd);
1154 }
1155
1156 static int check_for_next_transno(struct obd_device *obd)
1157 {
1158         struct ptlrpc_request *req;
1159         int wake_up = 0, connected, completed, queue_len, max;
1160         __u64 next_transno, req_transno;
1161
1162         spin_lock_bh(&obd->obd_processing_task_lock);
1163         req = list_entry(obd->obd_recovery_queue.next,
1164                          struct ptlrpc_request, rq_list);
1165         max = obd->obd_max_recoverable_clients;
1166         req_transno = req->rq_reqmsg->transno;
1167         connected = obd->obd_connected_clients;
1168         completed = max - obd->obd_recoverable_clients;
1169         queue_len = obd->obd_requests_queued_for_recovery;
1170         next_transno = obd->obd_next_recovery_transno;
1171
1172         CDEBUG(D_HA,"max: %d, connected: %d, completed: %d, queue_len: %d, "
1173                "req_transno: "LPU64", next_transno: "LPU64"\n",
1174                max, connected, completed, queue_len, req_transno, next_transno);
1175         if (obd->obd_abort_recovery) {
1176                 CDEBUG(D_HA, "waking for aborted recovery\n");
1177                 wake_up = 1;
1178         } else if (!obd->obd_recovering) {
1179                 CDEBUG(D_HA, "waking for completed recovery (?)\n");
1180                 wake_up = 1;
1181         } else if (req_transno == next_transno) {
1182                 CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno);
1183                 wake_up = 1;
1184         } else if (queue_len + completed == max) {
1185                 CDEBUG(D_ERROR,
1186                        "waking for skipped transno (skip: "LPD64
1187                        ", ql: %d, comp: %d, conn: %d, next: "LPD64")\n",
1188                        next_transno, queue_len, completed, max, req_transno);
1189                 obd->obd_next_recovery_transno = req_transno;
1190                 wake_up = 1;
1191         }
1192         spin_unlock_bh(&obd->obd_processing_task_lock);
1193         LASSERT(req->rq_reqmsg->transno >= next_transno);
1194         return wake_up;
1195 }
1196
1197 static void process_recovery_queue(struct obd_device *obd)
1198 {
1199         struct ptlrpc_request *req;
1200         int abort_recovery = 0;
1201         struct l_wait_info lwi = { 0 };
1202         ENTRY;
1203
1204         for (;;) {
1205                 spin_lock_bh(&obd->obd_processing_task_lock);
1206                 LASSERT(obd->obd_processing_task == current->pid);
1207                 req = list_entry(obd->obd_recovery_queue.next,
1208                                  struct ptlrpc_request, rq_list);
1209
1210                 if (req->rq_reqmsg->transno != obd->obd_next_recovery_transno) {
1211                         spin_unlock_bh(&obd->obd_processing_task_lock);
1212                         CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is "
1213                                LPD64")\n",
1214                                obd->obd_next_recovery_transno,
1215                                req->rq_reqmsg->transno);
1216                         l_wait_event(obd->obd_next_transno_waitq,
1217                                      check_for_next_transno(obd), &lwi);
1218                         spin_lock_bh(&obd->obd_processing_task_lock);
1219                         abort_recovery = obd->obd_abort_recovery;
1220                         spin_unlock_bh(&obd->obd_processing_task_lock);
1221                         if (abort_recovery) {
1222                                 target_abort_recovery(obd);
1223                                 return;
1224                         }
1225                         continue;
1226                 }
1227                 list_del_init(&req->rq_list);
1228                 obd->obd_requests_queued_for_recovery--;
1229                 spin_unlock_bh(&obd->obd_processing_task_lock);
1230
1231                 DEBUG_REQ(D_HA, req, "processing: ");
1232                 (void)obd->obd_recovery_handler(req);
1233                 obd->obd_replayed_requests++;
1234                 reset_recovery_timer(obd);
1235                 /* bug 1580: decide how to properly sync() in recovery */
1236                 //mds_fsync_super(mds->mds_sb);
1237                 class_export_put(req->rq_export);
1238                 if (req->rq_reply_state != NULL) {
1239                         ptlrpc_rs_decref(req->rq_reply_state);
1240                         /* req->rq_reply_state = NULL; */
1241                 }
1242                 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
1243                 OBD_FREE(req, sizeof *req);
1244                 spin_lock_bh(&obd->obd_processing_task_lock);
1245                 obd->obd_next_recovery_transno++;
1246                 if (list_empty(&obd->obd_recovery_queue)) {
1247                         obd->obd_processing_task = 0;
1248                         spin_unlock_bh(&obd->obd_processing_task_lock);
1249                         break;
1250                 }
1251                 spin_unlock_bh(&obd->obd_processing_task_lock);
1252         }
1253         EXIT;
1254 }
1255
1256 int target_queue_recovery_request(struct ptlrpc_request *req,
1257                                   struct obd_device *obd)
1258 {
1259         struct list_head *tmp;
1260         int inserted = 0;
1261         __u64 transno = req->rq_reqmsg->transno;
1262         struct ptlrpc_request *saved_req;
1263         struct lustre_msg *reqmsg;
1264
1265         /* CAVEAT EMPTOR: The incoming request message has been swabbed
1266          * (i.e. buflens etc are in my own byte order), but type-dependent
1267          * buffers (eg mds_body, ost_body etc) have NOT been swabbed. */
1268
1269         if (!transno) {
1270                 INIT_LIST_HEAD(&req->rq_list);
1271                 DEBUG_REQ(D_HA, req, "not queueing");
1272                 return 1;
1273         }
1274
1275         /* XXX If I were a real man, these LBUGs would be sane cleanups. */
1276         /* XXX just like the request-dup code in queue_final_reply */
1277         OBD_ALLOC(saved_req, sizeof *saved_req);
1278         if (!saved_req)
1279                 LBUG();
1280         OBD_ALLOC(reqmsg, req->rq_reqlen);
1281         if (!reqmsg)
1282                 LBUG();
1283
1284         spin_lock_bh(&obd->obd_processing_task_lock);
1285
1286         /* If we're processing the queue, we want don't want to queue this
1287          * message.
1288          *
1289          * Also, if this request has a transno less than the one we're waiting
1290          * for, we should process it now.  It could (and currently always will)
1291          * be an open request for a descriptor that was opened some time ago.
1292          *
1293          * Also, a resent, replayed request that has already been
1294          * handled will pass through here and be processed immediately.
1295          */
1296         if (obd->obd_processing_task == current->pid ||
1297             transno < obd->obd_next_recovery_transno) {
1298                 /* Processing the queue right now, don't re-add. */
1299                 LASSERT(list_empty(&req->rq_list));
1300                 spin_unlock_bh(&obd->obd_processing_task_lock);
1301                 OBD_FREE(reqmsg, req->rq_reqlen);
1302                 OBD_FREE(saved_req, sizeof *saved_req);
1303                 return 1;
1304         }
1305
1306         /* A resent, replayed request that is still on the queue; just drop it.
1307            The queued request will handle this. */
1308         if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT|MSG_REPLAY)) ==
1309             (MSG_RESENT | MSG_REPLAY)) {
1310                 DEBUG_REQ(D_ERROR, req, "dropping resent queued req");
1311                 spin_unlock_bh(&obd->obd_processing_task_lock);
1312                 OBD_FREE(reqmsg, req->rq_reqlen);
1313                 OBD_FREE(saved_req, sizeof *saved_req);
1314                 return 0;
1315         }
1316
1317         memcpy(saved_req, req, sizeof *req);
1318         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
1319         req = saved_req;
1320         req->rq_reqmsg = reqmsg;
1321         class_export_get(req->rq_export);
1322         INIT_LIST_HEAD(&req->rq_list);
1323
1324         /* XXX O(n^2) */
1325         list_for_each(tmp, &obd->obd_recovery_queue) {
1326                 struct ptlrpc_request *reqiter =
1327                         list_entry(tmp, struct ptlrpc_request, rq_list);
1328
1329                 if (reqiter->rq_reqmsg->transno > transno) {
1330                         list_add_tail(&req->rq_list, &reqiter->rq_list);
1331                         inserted = 1;
1332                         break;
1333                 }
1334         }
1335
1336         if (!inserted) {
1337                 list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
1338         }
1339
1340         obd->obd_requests_queued_for_recovery++;
1341
1342         if (obd->obd_processing_task != 0) {
1343                 /* Someone else is processing this queue, we'll leave it to
1344                  * them.
1345                  */
1346                 wake_up(&obd->obd_next_transno_waitq);
1347                 spin_unlock_bh(&obd->obd_processing_task_lock);
1348                 return 0;
1349         }
1350
1351         /* Nobody is processing, and we know there's (at least) one to process
1352          * now, so we'll do the honours.
1353          */
1354         obd->obd_processing_task = current->pid;
1355         spin_unlock_bh(&obd->obd_processing_task_lock);
1356
1357         process_recovery_queue(obd);
1358         return 0;
1359 }
1360
1361 struct obd_device * target_req2obd(struct ptlrpc_request *req)
1362 {
1363         return req->rq_export->exp_obd;
1364 }
1365
1366 int target_queue_final_reply(struct ptlrpc_request *req, int rc)
1367 {
1368         struct obd_device *obd = target_req2obd(req);
1369         struct ptlrpc_request *saved_req;
1370         struct lustre_msg *reqmsg;
1371         int recovery_done = 0;
1372
1373         LASSERT ((rc == 0) == (req->rq_reply_state != NULL));
1374
1375         if (rc) {
1376                 /* Just like ptlrpc_error, but without the sending. */
1377                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1378                 LASSERT(rc == 0); /* XXX handle this */
1379                 req->rq_type = PTL_RPC_MSG_ERR;
1380         }
1381
1382         LASSERT (!req->rq_reply_state->rs_difficult);
1383         LASSERT(list_empty(&req->rq_list));
1384         /* XXX a bit like the request-dup code in queue_recovery_request */
1385         OBD_ALLOC(saved_req, sizeof *saved_req);
1386         if (!saved_req)
1387                 LBUG();
1388         OBD_ALLOC(reqmsg, req->rq_reqlen);
1389         if (!reqmsg)
1390                 LBUG();
1391         memcpy(saved_req, req, sizeof *saved_req);
1392         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
1393         
1394         /* Don't race cleanup */
1395         spin_lock_bh(&obd->obd_processing_task_lock);
1396         if (obd->obd_stopping) {
1397                 spin_unlock_bh(&obd->obd_processing_task_lock);
1398                 OBD_FREE(reqmsg, req->rq_reqlen);
1399                 OBD_FREE(saved_req, sizeof *req);
1400                 req->rq_status = -ENOTCONN;
1401                 /* rv is ignored anyhow */
1402                 return -ENOTCONN;
1403         }
1404         ptlrpc_rs_addref(req->rq_reply_state);  /* +1 ref for saved reply */
1405         req = saved_req;
1406         req->rq_reqmsg = reqmsg;
1407         class_export_get(req->rq_export);
1408         list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
1409
1410         /* only count the first "replay over" request from each
1411            export */
1412         if (req->rq_export->exp_replay_needed) {
1413                 --obd->obd_recoverable_clients;
1414                 req->rq_export->exp_replay_needed = 0;
1415         }
1416         recovery_done = (obd->obd_recoverable_clients == 0);
1417         spin_unlock_bh(&obd->obd_processing_task_lock);
1418
1419         OBD_RACE(OBD_FAIL_LDLM_RECOV_CLIENTS);
1420         if (recovery_done) {
1421                 spin_lock_bh(&obd->obd_processing_task_lock);
1422                 obd->obd_recovering = obd->obd_abort_recovery = 0;
1423                 target_cancel_recovery_timer(obd);
1424                 spin_unlock_bh(&obd->obd_processing_task_lock);
1425
1426                 target_finish_recovery(obd);
1427                 ptlrpc_run_recovery_over_upcall(obd);
1428         } else {
1429                 CWARN("%s: %d recoverable clients remain\n",
1430                        obd->obd_name, obd->obd_recoverable_clients);
1431                 wake_up(&obd->obd_next_transno_waitq);
1432         }
1433
1434         return 1;
1435 }
1436
1437 int
1438 target_send_reply_msg (struct ptlrpc_request *req, int rc, int fail_id)
1439 {
1440         if (OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) {
1441                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
1442                 DEBUG_REQ(D_ERROR, req, "dropping reply");
1443                 return (-ECOMM);
1444         }
1445
1446         if (rc) {
1447                 DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
1448                 req->rq_status = rc;
1449                 return (ptlrpc_error(req));
1450         } else {
1451                 DEBUG_REQ(D_NET, req, "sending reply");
1452         }
1453         
1454         return (ptlrpc_send_reply(req, 1));
1455 }
1456
1457 void 
1458 target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
1459 {
1460         int                        netrc;
1461         unsigned long              flags;
1462         struct ptlrpc_reply_state *rs;
1463         struct obd_device         *obd;
1464         struct obd_export         *exp;
1465         struct ptlrpc_service     *svc;
1466
1467         svc = req->rq_rqbd->rqbd_service;
1468         
1469         rs = req->rq_reply_state;
1470         if (rs == NULL || !rs->rs_difficult) {
1471                 /* no notifiers */
1472                 target_send_reply_msg (req, rc, fail_id);
1473                 return;
1474         }
1475
1476         /* must be an export if locks saved */
1477         LASSERT (req->rq_export != NULL);
1478         /* req/reply consistent */
1479         LASSERT (rs->rs_service == svc);
1480
1481         /* "fresh" reply */
1482         LASSERT (!rs->rs_scheduled);
1483         LASSERT (!rs->rs_scheduled_ever);
1484         LASSERT (!rs->rs_handled);
1485         LASSERT (!rs->rs_on_net);
1486         LASSERT (rs->rs_export == NULL);
1487         LASSERT (list_empty(&rs->rs_obd_list));
1488         LASSERT (list_empty(&rs->rs_exp_list));
1489
1490         exp = class_export_get (req->rq_export);
1491         obd = exp->exp_obd;
1492
1493         /* disable reply scheduling onto srv_reply_queue while I'm setting up */
1494         rs->rs_scheduled = 1;
1495         rs->rs_on_net    = 1;
1496         rs->rs_xid       = req->rq_xid;
1497         rs->rs_transno   = req->rq_transno;
1498         rs->rs_export    = exp;
1499         
1500         spin_lock_irqsave (&obd->obd_uncommitted_replies_lock, flags);
1501
1502         if (rs->rs_transno > obd->obd_last_committed) {
1503                 /* not committed already */ 
1504                 list_add_tail (&rs->rs_obd_list, 
1505                                &obd->obd_uncommitted_replies);
1506         }
1507
1508         spin_unlock (&obd->obd_uncommitted_replies_lock);
1509         spin_lock (&exp->exp_lock);
1510
1511         list_add_tail (&rs->rs_exp_list, &exp->exp_outstanding_replies);
1512
1513         spin_unlock_irqrestore (&exp->exp_lock, flags);
1514
1515         netrc = target_send_reply_msg (req, rc, fail_id);
1516
1517         spin_lock_irqsave (&svc->srv_lock, flags);
1518
1519         svc->srv_n_difficult_replies++;
1520
1521         if (netrc != 0) {
1522                 /* error sending: reply is off the net.  Also we need +1
1523                  * reply ref until ptlrpc_server_handle_reply() is done
1524                  * with the reply state (if the send was successful, there
1525                  * would have been +1 ref for the net, which
1526                  * reply_out_callback leaves alone) */
1527                 rs->rs_on_net = 0;
1528                 ptlrpc_rs_addref(rs);
1529                 atomic_inc (&svc->srv_outstanding_replies);
1530         }
1531
1532         if (!rs->rs_on_net ||                   /* some notifier */
1533             list_empty(&rs->rs_exp_list) ||     /* completed already */
1534             list_empty(&rs->rs_obd_list)) {
1535                 list_add_tail (&rs->rs_list, &svc->srv_reply_queue);
1536                 wake_up (&svc->srv_waitq);
1537         } else {
1538                 list_add (&rs->rs_list, &svc->srv_active_replies);
1539                 rs->rs_scheduled = 0;           /* allow notifier to schedule */
1540         }
1541
1542         spin_unlock_irqrestore (&svc->srv_lock, flags);
1543 }
1544
1545 int target_handle_ping(struct ptlrpc_request *req)
1546 {
1547         return lustre_pack_reply(req, 0, NULL, NULL);
1548 }
1549
1550 void target_committed_to_req(struct ptlrpc_request *req)
1551 {
1552         struct obd_device *obd = req->rq_export->exp_obd;
1553
1554         if (!obd->obd_no_transno && req->rq_repmsg != NULL)
1555                 req->rq_repmsg->last_committed = obd->obd_last_committed;
1556         else
1557                 DEBUG_REQ(D_IOCTL, req,
1558                           "not sending last_committed update");
1559
1560         CDEBUG(D_INFO, "last_committed "LPU64", xid "LPU64"\n",
1561                obd->obd_last_committed, req->rq_xid);
1562 }
1563
1564 EXPORT_SYMBOL(target_committed_to_req);