Whamcloud - gitweb
Land b_hd_capa onto HEAD (20050809_1942)
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  * These are the only exported functions, they provide some generic
22  * infrastructure for managing object devices
23  */
24
25 #define DEBUG_SUBSYSTEM S_CLASS
26 #ifdef __KERNEL__
27 #include <linux/kmod.h>   /* for request_module() */
28 #include <linux/module.h>
29 #include <linux/obd_class.h>
30 #include <linux/random.h>
31 #include <linux/slab.h>
32 #include <linux/pagemap.h>
33 #else
34 #include <liblustre.h>
35 #include <linux/obd_class.h>
36 #include <linux/obd.h>
37 #endif
38 #include <linux/lprocfs_status.h>
39 #include <linux/lustre_sec.h>
40
41 extern struct list_head obd_types;
42 static spinlock_t obd_types_lock = SPIN_LOCK_UNLOCKED;
43 kmem_cache_t *obdo_cachep = NULL;
44 kmem_cache_t *import_cachep = NULL;
45
46 extern kmem_cache_t *capa_cachep;
47
48 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
49 void (*ptlrpc_abort_inflight_superhack)(struct obd_import *imp);
50
51 /*
52  * support functions: we could use inter-module communication, but this
53  * is more portable to other OS's
54  */
55 static struct obd_type *class_search_type(char *name)
56 {
57         struct list_head *tmp;
58         struct obd_type *type;
59
60         spin_lock(&obd_types_lock);
61         list_for_each(tmp, &obd_types) {
62                 type = list_entry(tmp, struct obd_type, typ_chain);
63                 if (strlen(type->typ_name) == strlen(name) &&
64                     strcmp(type->typ_name, name) == 0) {
65                         spin_unlock(&obd_types_lock);
66                         return type;
67                 }
68         }
69         spin_unlock(&obd_types_lock);
70         return NULL;
71 }
72
73 struct obd_type *class_get_type(char *name)
74 {
75         struct obd_type *type = class_search_type(name);
76
77 #ifdef CONFIG_KMOD
78         if (!type) {
79                 if (!request_module(name)) {
80                         CDEBUG(D_INFO, "Loaded module '%s'\n", name);
81                         type = class_search_type(name);
82                 } else
83                         CDEBUG(D_INFO, "Can't load module '%s'\n", name);
84         }
85 #endif
86         if (type)
87                 try_module_get(type->typ_ops->o_owner);
88         return type;
89 }
90
91 void class_put_type(struct obd_type *type)
92 {
93         LASSERT(type);
94         module_put(type->typ_ops->o_owner);
95 }
96
97 int class_register_type(struct obd_ops *ops, struct md_ops *md_ops,
98                         struct lprocfs_vars *vars, char *name)
99 {
100         struct obd_type *type;
101         int rc = 0;
102         ENTRY;
103
104         LASSERT(strnlen(name, 1024) < 1024);    /* sanity check */
105
106         if (class_search_type(name)) {
107                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
108                 RETURN(-EEXIST);
109         }
110
111         rc = -ENOMEM;
112         OBD_ALLOC(type, sizeof(*type));
113         if (type == NULL)
114                 RETURN(rc);
115
116         OBD_ALLOC(type->typ_ops, sizeof(*type->typ_ops));
117         OBD_ALLOC(type->typ_name, strlen(name) + 1);
118         if (md_ops)
119                 OBD_ALLOC(type->typ_md_ops, sizeof(*type->typ_md_ops));
120         if (type->typ_ops == NULL || type->typ_name == NULL ||
121                         (md_ops && type->typ_md_ops == NULL))
122                 GOTO (failed, rc);
123
124         *(type->typ_ops) = *ops;
125         if (md_ops)
126                 *(type->typ_md_ops) = *md_ops;
127         else
128                 type->typ_md_ops = NULL;
129         strcpy(type->typ_name, name);
130
131 #ifdef LPROCFS
132         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
133                                               vars, type);
134 #endif
135         if (IS_ERR(type->typ_procroot)) {
136                 rc = PTR_ERR(type->typ_procroot);
137                 type->typ_procroot = NULL;
138                 GOTO (failed, rc);
139         }
140
141         spin_lock(&obd_types_lock);
142         list_add(&type->typ_chain, &obd_types);
143         spin_unlock(&obd_types_lock);
144
145         RETURN (0);
146
147  failed:
148         if (type->typ_name != NULL)
149                 OBD_FREE(type->typ_name, strlen(name) + 1);
150         if (type->typ_ops != NULL)
151                 OBD_FREE (type->typ_ops, sizeof (*type->typ_ops));
152         if (type->typ_md_ops != NULL)
153                 OBD_FREE (type->typ_md_ops, sizeof (*type->typ_md_ops));
154         OBD_FREE(type, sizeof(*type));
155         RETURN(rc);
156 }
157
158 int class_unregister_type(char *name)
159 {
160         struct obd_type *type = class_search_type(name);
161         ENTRY;
162
163         if (!type) {
164                 CERROR("unknown obd type\n");
165                 RETURN(-EINVAL);
166         }
167
168         if (type->typ_refcnt) {
169                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
170                 /* This is a bad situation, let's make the best of it */
171                 /* Remove ops, but leave the name for debugging */
172                 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
173                 RETURN(-EBUSY);
174         }
175
176         if (type->typ_procroot) {
177                 lprocfs_remove(type->typ_procroot);
178                 type->typ_procroot = NULL;
179         }
180
181         spin_lock(&obd_types_lock);
182         list_del(&type->typ_chain);
183         spin_unlock(&obd_types_lock);
184         OBD_FREE(type->typ_name, strlen(name) + 1);
185         if (type->typ_ops != NULL)
186                 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
187         if (type->typ_md_ops != NULL)
188                 OBD_FREE (type->typ_md_ops, sizeof(*type->typ_md_ops));
189         OBD_FREE(type, sizeof(*type));
190         RETURN(0);
191 } /* class_unregister_type */
192
193 struct obd_device *class_newdev(struct obd_type *type)
194 {
195         struct obd_device *result = NULL;
196         int i;
197
198         spin_lock(&obd_dev_lock);
199         for (i = 0 ; i < MAX_OBD_DEVICES && result == NULL; i++) {
200                 struct obd_device *obd = &obd_dev[i];
201                 if (!obd->obd_type) {
202                         LASSERT(obd->obd_minor == i);
203                         memset(obd, 0, sizeof(*obd));
204                         obd->obd_minor = i;
205                         obd->obd_type = type;
206                         result = obd;
207                 }
208         }
209         spin_unlock(&obd_dev_lock);
210         return result;
211 }
212
213 void class_release_dev(struct obd_device *obd)
214 {
215         int minor = obd->obd_minor;
216
217         spin_lock(&obd_dev_lock);
218         obd->obd_type = NULL;
219         //memset(obd, 0, sizeof(*obd));
220         obd->obd_minor = minor;
221         spin_unlock(&obd_dev_lock);
222 }
223
224 int class_name2dev(char *name)
225 {
226         int i;
227
228         if (!name)
229                 return -1;
230
231         spin_lock(&obd_dev_lock);
232         for (i = 0; i < MAX_OBD_DEVICES; i++) {
233                 struct obd_device *obd = &obd_dev[i];
234                 if (obd->obd_name && strcmp(name, obd->obd_name) == 0) {
235                         spin_unlock(&obd_dev_lock);
236                         return i;
237                 }
238         }
239         spin_unlock(&obd_dev_lock);
240         return -1;
241 }
242
243 struct obd_device *class_name2obd(char *name)
244 {
245         int dev = class_name2dev(name);
246         if (dev < 0)
247                 return NULL;
248         return &obd_dev[dev];
249 }
250
251 int class_uuid2dev(struct obd_uuid *uuid)
252 {
253         int i;
254         spin_lock(&obd_dev_lock);
255         for (i = 0; i < MAX_OBD_DEVICES; i++) {
256                 struct obd_device *obd = &obd_dev[i];
257                 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
258                         spin_unlock(&obd_dev_lock);
259                         return i;
260                 }
261         }
262         spin_unlock(&obd_dev_lock);
263         return -1;
264 }
265
266 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
267 {
268         int dev = class_uuid2dev(uuid);
269         if (dev < 0)
270                 return NULL;
271         return &obd_dev[dev];
272 }
273
274 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
275    specified, then only the client with that uuid is returned,
276    otherwise any client connected to the tgt is returned.
277    If tgt_uuid is NULL, the lov with grp_uuid is returned. */
278 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
279                                          char *typ_name,
280                                          struct obd_uuid *grp_uuid)
281 {
282         int i;
283
284         spin_lock(&obd_dev_lock);
285         for (i = 0; i < MAX_OBD_DEVICES; i++) {
286                 struct obd_device *obd = &obd_dev[i];
287                 if (obd->obd_type == NULL)
288                         continue;
289                 if ((strncmp(obd->obd_type->typ_name, typ_name,
290                              strlen(typ_name)) == 0)) {
291                         struct client_obd *cli = &obd->u.cli;
292                         struct obd_import *imp = cli->cl_import;
293                         if (tgt_uuid == NULL) {
294                                 LASSERT(grp_uuid);
295                                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
296                                         spin_unlock(&obd_dev_lock);
297                                         return obd;
298                                 }
299                                 continue;
300                         }
301                         if (obd_uuid_equals(tgt_uuid, &imp->imp_target_uuid) &&
302                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
303                                                          &obd->obd_uuid) : 1)) {
304                                 spin_unlock(&obd_dev_lock);
305                                 return obd;
306                         }
307                 }
308         }
309         spin_unlock(&obd_dev_lock);
310         return NULL;
311 }
312
313 /* Iterate the obd_device list looking devices have grp_uuid. Start
314    searching at *next, and if a device is found, the next index to look
315    it is saved in *next. If next is NULL, then the first matching device
316    will always be returned. */
317 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
318 {
319         int i;
320
321         if (next == NULL) 
322                 i = 0;
323         else if (*next >= 0 && *next < MAX_OBD_DEVICES)
324                 i = *next;
325         else 
326                 return NULL;
327         spin_lock(&obd_dev_lock);                
328         for (; i < MAX_OBD_DEVICES; i++) {
329                 struct obd_device *obd = &obd_dev[i];
330                 if (obd->obd_type == NULL)
331                         continue;
332                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
333                         if (next != NULL)
334                                 *next = i+1;
335                         spin_unlock(&obd_dev_lock);
336                         return obd;
337                 }
338         }
339
340         spin_unlock(&obd_dev_lock);
341         return NULL;
342 }
343
344 void obd_cleanup_caches(void)
345 {
346         ENTRY;
347         if (obdo_cachep) {
348                 LASSERTF(kmem_cache_destroy(obdo_cachep) == 0,
349                          "Cannot destory ll_obdo_cache\n");
350                 obdo_cachep = NULL;
351         }
352         if (import_cachep) {
353                 LASSERTF(kmem_cache_destroy(import_cachep) == 0,
354                          "Cannot destory ll_import_cache\n");
355                 import_cachep = NULL;
356         }
357         if (capa_cachep) {
358                 LASSERTF(kmem_cache_destroy(capa_cachep) == 0,
359                          "Cannot destory capa_cache\n");
360                 capa_cachep = NULL;
361         }
362         EXIT;
363 }
364
365 int obd_init_caches(void)
366 {
367         ENTRY;
368         LASSERT(obdo_cachep == NULL);
369         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
370                                         0, 0, NULL, NULL);
371         if (!obdo_cachep)
372                 GOTO(out, -ENOMEM);
373
374         LASSERT(import_cachep == NULL);
375         import_cachep = kmem_cache_create("ll_import_cache",
376                                           sizeof(struct obd_import),
377                                           0, 0, NULL, NULL);
378         if (!import_cachep)
379                 GOTO(out, -ENOMEM);
380
381         LASSERT(capa_cachep == NULL);
382         capa_cachep = kmem_cache_create("capa_cache",
383                                         sizeof(struct obd_capa),
384                                         0, 0, NULL, NULL);
385         if (!capa_cachep)
386                 GOTO(out, -ENOMEM);
387
388         RETURN(0);
389  out:
390         obd_cleanup_caches();
391         RETURN(-ENOMEM);
392
393 }
394
395 /* map connection to client */
396 struct obd_export *class_conn2export(struct lustre_handle *conn)
397 {
398         struct obd_export *export;
399         ENTRY;
400
401         if (!conn) {
402                 CDEBUG(D_CACHE, "looking for null handle\n");
403                 RETURN(NULL);
404         }
405
406         if (conn->cookie == -1) {  /* this means assign a new connection */
407                 CDEBUG(D_CACHE, "want a new connection\n");
408                 RETURN(NULL);
409         }
410
411         CDEBUG(D_IOCTL, "looking for export cookie "LPX64"\n", conn->cookie);
412         export = class_handle2object(conn->cookie);
413         RETURN(export);
414 }
415
416 struct obd_device *class_exp2obd(struct obd_export *exp)
417 {
418         if (exp)
419                 return exp->exp_obd;
420         return NULL;
421 }
422
423 struct obd_device *class_conn2obd(struct lustre_handle *conn)
424 {
425         struct obd_export *export;
426         export = class_conn2export(conn);
427         if (export) {
428                 struct obd_device *obd = export->exp_obd;
429                 class_export_put(export);
430                 return obd;
431         }
432         return NULL;
433 }
434
435 struct obd_import *class_exp2cliimp(struct obd_export *exp)
436 {
437         struct obd_device *obd = exp->exp_obd;
438         if (obd == NULL)
439                 return NULL;
440         return obd->u.cli.cl_import;
441 }
442
443 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
444 {
445         struct obd_device *obd = class_conn2obd(conn);
446         if (obd == NULL)
447                 return NULL;
448         return obd->u.cli.cl_import;
449 }
450
451 /* Export management functions */
452 static void export_handle_addref(void *export)
453 {
454         class_export_get(export);
455 }
456
457 void __class_export_put(struct obd_export *exp)
458 {
459         if (atomic_dec_and_test(&exp->exp_refcount)) {
460                 struct obd_device *obd = exp->exp_obd;
461                 CDEBUG(D_IOCTL, "destroying export %p/%s\n", exp,
462                        exp->exp_client_uuid.uuid);
463
464                 LASSERT(obd != NULL);
465
466                 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
467                 if (exp->exp_connection)
468                         ptlrpc_put_connection_superhack(exp->exp_connection);
469
470                 LASSERT(list_empty(&exp->exp_outstanding_replies));
471                 LASSERT(list_empty(&exp->exp_handle.h_link));
472                 obd_destroy_export(exp);
473
474                 OBD_FREE(exp, sizeof(*exp));
475                 if (obd->obd_set_up) {
476                         atomic_dec(&obd->obd_refcount);
477                         wake_up(&obd->obd_refcount_waitq);
478                 } else {
479                         CERROR("removing export %p from obd %s (%p) -- OBD "
480                                "not set up (refcount = %d)\n", exp,
481                                obd->obd_name, obd,
482                                atomic_read(&obd->obd_refcount));
483                 }
484         }
485 }
486
487 /* Creates a new export, adds it to the hash table, and returns a
488  * pointer to it. The refcount is 2: one for the hash reference, and
489  * one for the pointer returned by this function. */
490 struct obd_export *class_new_export(struct obd_device *obd)
491 {
492         struct obd_export *export;
493
494         OBD_ALLOC(export, sizeof(*export));
495         if (!export) {
496                 CERROR("no memory! (minor %d)\n", obd->obd_minor);
497                 return NULL;
498         }
499
500         export->exp_conn_cnt = 0;
501         atomic_set(&export->exp_refcount, 2);
502         atomic_set(&export->exp_rpc_count, 0);
503         export->exp_obd = obd;
504         export->exp_flags = 0;
505         INIT_LIST_HEAD(&export->exp_outstanding_replies);
506         /* XXX this should be in LDLM init */
507         INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
508         spin_lock_init(&export->exp_ldlm_data.led_lock);
509
510         INIT_LIST_HEAD(&export->exp_handle.h_link);
511         class_handle_hash(&export->exp_handle, export_handle_addref);
512         spin_lock_init(&export->exp_lock);
513
514         spin_lock(&obd->obd_dev_lock);
515         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
516         atomic_inc(&obd->obd_refcount);
517         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
518         export->exp_obd->obd_num_exports++;
519         spin_unlock(&obd->obd_dev_lock);
520         export->exp_connected = 1;
521         obd_init_export(export);
522         return export;
523 }
524
525 void class_unlink_export(struct obd_export *exp)
526 {
527         class_handle_unhash(&exp->exp_handle);
528
529         spin_lock(&exp->exp_obd->obd_dev_lock);
530         list_del_init(&exp->exp_obd_chain);
531         exp->exp_obd->obd_num_exports--;
532         spin_unlock(&exp->exp_obd->obd_dev_lock);
533
534         class_export_put(exp);
535 }
536
537 /* Import management functions */
538 static void import_handle_addref(void *import)
539 {
540         class_import_get(import);
541 }
542
543 struct obd_import *class_import_get(struct obd_import *import)
544 {
545         atomic_inc(&import->imp_refcount);
546         CDEBUG(D_IOCTL, "import %p refcount=%d\n", import,
547                atomic_read(&import->imp_refcount));
548         return import;
549 }
550
551 void class_import_put(struct obd_import *import)
552 {
553         ENTRY;
554
555         CDEBUG(D_IOCTL, "import %p refcount=%d\n", import,
556                atomic_read(&import->imp_refcount) - 1);
557
558         LASSERT(atomic_read(&import->imp_refcount) > 0);
559         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
560         
561         if (!atomic_dec_and_test(&import->imp_refcount)) {
562                 EXIT;
563                 return;
564         }
565
566         CDEBUG(D_IOCTL, "destroying import %p\n", import);
567         
568         if (import->imp_connection)
569                 ptlrpc_put_connection_superhack(import->imp_connection);
570
571         LASSERT(!import->imp_sec);
572         while (!list_empty(&import->imp_conn_list)) {
573                 struct obd_import_conn *imp_conn;
574
575                 imp_conn = list_entry(import->imp_conn_list.next,
576                                       struct obd_import_conn, oic_item);
577                 list_del(&imp_conn->oic_item);
578                 if (imp_conn->oic_conn)
579                         ptlrpc_put_connection_superhack(imp_conn->oic_conn);
580                 OBD_FREE(imp_conn, sizeof(*imp_conn));
581         }
582
583         LASSERT(list_empty(&import->imp_handle.h_link));
584         OBD_FREE(import, sizeof(*import));
585         EXIT;
586 }
587
588 struct obd_import *class_new_import(void)
589 {
590         struct obd_import *imp;
591
592         OBD_ALLOC(imp, sizeof(*imp));
593         if (imp == NULL)
594                 return NULL;
595
596         INIT_LIST_HEAD(&imp->imp_replay_list);
597         INIT_LIST_HEAD(&imp->imp_sending_list);
598         INIT_LIST_HEAD(&imp->imp_delayed_list);
599         INIT_LIST_HEAD(&imp->imp_rawrpc_list);
600         spin_lock_init(&imp->imp_lock);
601         imp->imp_conn_cnt = 0;
602         imp->imp_max_transno = 0;
603         imp->imp_peer_committed_transno = 0;
604         imp->imp_state = LUSTRE_IMP_NEW;
605         init_waitqueue_head(&imp->imp_recovery_waitq);
606
607         atomic_set(&imp->imp_refcount, 2);
608         atomic_set(&imp->imp_inflight, 0);
609         atomic_set(&imp->imp_replay_inflight, 0);
610         INIT_LIST_HEAD(&imp->imp_conn_list);
611         INIT_LIST_HEAD(&imp->imp_handle.h_link);
612         class_handle_hash(&imp->imp_handle, import_handle_addref);
613         imp->imp_waiting_ping_reply = 0;
614
615         return imp;
616 }
617
618 void class_destroy_import(struct obd_import *import)
619 {
620         LASSERT(import != NULL);
621         LASSERT(import != LP_POISON);
622
623         class_handle_unhash(&import->imp_handle);
624
625         /* Abort any inflight DLM requests and NULL out their (about to be
626          * freed) import. */
627         /* Invalidate all requests on import, would be better to call
628            ptlrpc_set_import_active(imp, 0); */
629         import->imp_generation++;
630         ptlrpc_abort_inflight_superhack(import);
631
632         class_import_put(import);
633 }
634
635 /* A connection defines an export context in which preallocation can
636    be managed. This releases the export pointer reference, and returns
637    the export handle, so the export refcount is 1 when this function
638    returns. */
639 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
640                   struct obd_uuid *cluuid)
641 {
642         struct obd_export *export;
643         LASSERT(conn != NULL);
644         LASSERT(obd != NULL);
645         LASSERT(cluuid != NULL);
646         ENTRY;
647
648         export = class_new_export(obd);
649         if (export == NULL)
650                 RETURN(-ENOMEM);
651
652         conn->cookie = export->exp_handle.h_cookie;
653         memcpy(&export->exp_client_uuid, cluuid,
654                sizeof(export->exp_client_uuid));
655         class_export_put(export);
656
657         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
658                cluuid->uuid, conn->cookie);
659         RETURN(0);
660 }
661
662 /* This function removes two references from the export: one for the
663  * hash entry and one for the export pointer passed in.  The export
664  * pointer passed to this function is destroyed should not be used
665  * again. */
666 int class_disconnect(struct obd_export *export, unsigned long flags)
667 {
668         ENTRY;
669
670         if (export == NULL) {
671                 fixme();
672                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
673                 RETURN(-EINVAL);
674         }
675
676         /* XXX this shouldn't have to be here, but double-disconnect will crash
677          * otherwise, and sometimes double-disconnect happens.  abort_recovery,
678          * for example. */
679         if (list_empty(&export->exp_handle.h_link))
680                 RETURN(0);
681
682         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
683                export->exp_handle.h_cookie);
684
685         if (export->exp_handle.h_cookie == LL_POISON) {
686                 CERROR("disconnecting freed export %p, ignoring\n", export);
687         } else {
688                 class_unlink_export(export);
689                 class_export_put(export);
690         }
691         RETURN(0);
692 }
693
694 static void class_disconnect_export_list(struct list_head *list, unsigned long flags)
695 {
696         struct obd_export *fake_exp, *exp;
697         struct lustre_handle fake_conn;
698         int rc;
699         ENTRY;
700
701         /* Move all of the exports from obd_exports to a work list, en masse. */
702         /* It's possible that an export may disconnect itself, but
703          * nothing else will be added to this list. */
704         while(!list_empty(list)) {
705                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
706                 class_export_get(exp);
707
708                 if (obd_uuid_equals(&exp->exp_client_uuid,
709                                     &exp->exp_obd->obd_uuid)) {
710                         CDEBUG(D_HA,
711                                "exp %p export uuid == obd uuid, don't discon\n",
712                                exp);
713                         /*
714                          * need to delete this now so we don't end up pointing
715                          * to work_list later when this export is cleaned up.
716                          */
717                         list_del_init(&exp->exp_obd_chain);
718                         class_export_put(exp);
719                         continue;
720                 }
721
722                 fake_conn.cookie = exp->exp_handle.h_cookie;
723                 fake_exp = class_conn2export(&fake_conn);
724                 if (!fake_exp) {
725                         class_export_put(exp);
726                         continue;
727                 }
728                 
729                 rc = obd_disconnect(fake_exp, flags);
730                 class_export_put(exp);
731                 if (rc) {
732                         CDEBUG(D_HA, "disconnecting export %p failed: %d\n",
733                                exp, rc);
734                 } else {
735                         CDEBUG(D_HA, "export %p disconnected\n", exp);
736                 }
737         }
738         EXIT;
739 }
740
741 void class_disconnect_exports(struct obd_device *obd, unsigned long flags)
742 {
743         struct list_head work_list;
744         ENTRY;
745
746         /* Move all of the exports from obd_exports to a work list, en masse. */
747         spin_lock(&obd->obd_dev_lock);
748         list_add(&work_list, &obd->obd_exports);
749         list_del_init(&obd->obd_exports);
750         spin_unlock(&obd->obd_dev_lock);
751
752         CDEBUG(D_HA, "OBD device %d (%p) has exports, "
753                "disconnecting them\n", obd->obd_minor, obd);
754         class_disconnect_export_list(&work_list, flags);
755         EXIT;
756 }
757
758 /* Remove exports that have not completed recovery.
759  */
760 int class_disconnect_stale_exports(struct obd_device *obd,
761                                    int (*test_export)(struct obd_export *),
762                                    unsigned long flags)
763 {
764         char str[PTL_NALFMT_SIZE];
765         struct list_head work_list;
766         struct list_head *pos, *n;
767         struct obd_export *exp;
768         int cnt = 0;
769         ENTRY;
770
771         INIT_LIST_HEAD(&work_list);
772         spin_lock(&obd->obd_dev_lock);
773         list_for_each_safe(pos, n, &obd->obd_exports) {
774                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
775                 if (test_export(exp))
776                         continue;
777                 list_del(&exp->exp_obd_chain);
778                 list_add(&exp->exp_obd_chain, &work_list);
779                 cnt++;
780                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
781                        obd->obd_name, exp->exp_client_uuid.uuid,
782                        exp->exp_connection == NULL ? "<unknown>" :
783                        ptlrpc_peernid2str(&exp->exp_connection->c_peer, str));
784         }
785         spin_unlock(&obd->obd_dev_lock);
786
787         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
788                obd->obd_name, cnt);
789         class_disconnect_export_list(&work_list, flags);
790         RETURN(cnt);
791 }
792
793
794
795 int oig_init(struct obd_io_group **oig_out)
796 {
797         struct obd_io_group *oig;
798         ENTRY;
799
800         OBD_ALLOC(oig, sizeof(*oig));
801         if (oig == NULL)
802                 RETURN(-ENOMEM);
803
804         spin_lock_init(&oig->oig_lock);
805         oig->oig_rc = 0;
806         oig->oig_pending = 0;
807         atomic_set(&oig->oig_refcount, 1);
808         init_waitqueue_head(&oig->oig_waitq);
809         INIT_LIST_HEAD(&oig->oig_occ_list);
810
811         *oig_out = oig;
812         RETURN(0);
813 };
814
815 static inline void oig_grab(struct obd_io_group *oig)
816 {
817         atomic_inc(&oig->oig_refcount);
818 }
819
820 void oig_release(struct obd_io_group *oig)
821 {
822         if (atomic_dec_and_test(&oig->oig_refcount))
823                 OBD_FREE(oig, sizeof(*oig));
824 }
825
826 void oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
827 {
828         unsigned long flags;
829         CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
830         spin_lock_irqsave(&oig->oig_lock, flags);
831         oig->oig_pending++;
832         if (occ != NULL)
833                 list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
834         spin_unlock_irqrestore(&oig->oig_lock, flags);
835         oig_grab(oig);
836 }
837
838 void oig_complete_one(struct obd_io_group *oig,
839                       struct oig_callback_context *occ, int rc)
840 {
841         unsigned long flags;
842         wait_queue_head_t *wake = NULL;
843         int old_rc;
844
845         spin_lock_irqsave(&oig->oig_lock, flags);
846
847         if (occ != NULL)
848                 list_del_init(&occ->occ_oig_item);
849
850         old_rc = oig->oig_rc;
851         if (oig->oig_rc == 0 && rc != 0)
852                 oig->oig_rc = rc;
853
854         if (--oig->oig_pending <= 0)
855                 wake = &oig->oig_waitq;
856
857         spin_unlock_irqrestore(&oig->oig_lock, flags);
858
859         CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
860                         "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
861                         oig->oig_pending);
862         if (wake)
863                 wake_up(wake);
864         oig_release(oig);
865 }
866
867 static int oig_done(struct obd_io_group *oig)
868 {
869         unsigned long flags;
870         int rc = 0;
871         spin_lock_irqsave(&oig->oig_lock, flags);
872         if (oig->oig_pending <= 0)
873                 rc = 1;
874         spin_unlock_irqrestore(&oig->oig_lock, flags);
875         return rc;
876 }
877
878 static void interrupted_oig(void *data)
879 {
880         struct obd_io_group *oig = data;
881         struct oig_callback_context *occ;
882         unsigned long flags;
883
884         spin_lock_irqsave(&oig->oig_lock, flags);
885         /* We need to restart the processing each time we drop the lock, as
886          * it is possible other threads called oig_complete_one() to remove
887          * an entry elsewhere in the list while we dropped lock.  We need to
888          * drop the lock because osc_ap_completion() calls oig_complete_one()
889          * which re-gets this lock ;-) as well as a lock ordering issue. */
890 restart:
891         list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
892                 if (occ->interrupted)
893                         continue;
894                 occ->interrupted = 1;
895                 spin_unlock_irqrestore(&oig->oig_lock, flags);
896                 occ->occ_interrupted(occ);
897                 spin_lock_irqsave(&oig->oig_lock, flags);
898                 goto restart;
899         }
900         spin_unlock_irqrestore(&oig->oig_lock, flags);
901 }
902
903 int oig_wait(struct obd_io_group *oig)
904 {
905         struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
906         int rc;
907
908         CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
909
910         do {
911                 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
912                 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
913                 /* we can't continue until the oig has emptied and stopped
914                  * referencing state that the caller will free upon return */
915                 if (rc == -EINTR)
916                         lwi = (struct l_wait_info){ 0, };
917         } while (rc == -EINTR);
918
919         LASSERTF(oig->oig_pending == 0,
920                  "exiting oig_wait(oig = %p) with %d pending\n", oig,
921                  oig->oig_pending);
922
923         CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
924         return oig->oig_rc;
925 }