Whamcloud - gitweb
Fix quota code problem where on osts objects might be greated with wrong uid/gid
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  * These are the only exported functions, they provide some generic
22  * infrastructure for managing object devices
23  */
24
25 #define DEBUG_SUBSYSTEM S_CLASS
26 #ifdef __KERNEL__
27 #include <linux/kmod.h>   /* for request_module() */
28 #include <linux/module.h>
29 #include <linux/obd_class.h>
30 #include <linux/lustre_mds.h>
31 #include <linux/obd_ost.h>
32 #include <linux/random.h>
33 #include <linux/slab.h>
34 #include <linux/pagemap.h>
35 #include <linux/quota.h>
36 #else
37 #include <liblustre.h>
38 #include <linux/obd_class.h>
39 #include <linux/obd.h>
40 #endif
41 #include <linux/lprocfs_status.h>
42 #include <linux/lustre_quota.h>
43
44 extern struct list_head obd_types;
45 static spinlock_t obd_types_lock = SPIN_LOCK_UNLOCKED;
46 kmem_cache_t *obdo_cachep = NULL;
47 kmem_cache_t *import_cachep = NULL;
48
49 kmem_cache_t *qunit_cachep = NULL;
50 struct list_head qunit_hash[NR_DQHASH];
51 spinlock_t qunit_hash_lock = SPIN_LOCK_UNLOCKED;
52
53 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
54 void (*ptlrpc_abort_inflight_superhack)(struct obd_import *imp);
55
56 /*
57  * support functions: we could use inter-module communication, but this
58  * is more portable to other OS's
59  */
60 static struct obd_type *class_search_type(char *name)
61 {
62         struct list_head *tmp;
63         struct obd_type *type;
64
65         spin_lock(&obd_types_lock);
66         list_for_each(tmp, &obd_types) {
67                 type = list_entry(tmp, struct obd_type, typ_chain);
68                 if (strlen(type->typ_name) == strlen(name) &&
69                     strcmp(type->typ_name, name) == 0) {
70                         spin_unlock(&obd_types_lock);
71                         return type;
72                 }
73         }
74         spin_unlock(&obd_types_lock);
75         return NULL;
76 }
77
78 struct obd_type *class_get_type(char *name)
79 {
80         struct obd_type *type = class_search_type(name);
81
82 #ifdef CONFIG_KMOD
83         if (!type) {
84                 if (!request_module(name)) {
85                         CDEBUG(D_INFO, "Loaded module '%s'\n", name);
86                         type = class_search_type(name);
87                 } else
88                         CDEBUG(D_INFO, "Can't load module '%s'\n", name);
89         }
90 #endif
91         if (type)
92                 try_module_get(type->typ_ops->o_owner);
93         return type;
94 }
95
96 void class_put_type(struct obd_type *type)
97 {
98         LASSERT(type);
99         module_put(type->typ_ops->o_owner);
100 }
101
102 int class_register_type(struct obd_ops *ops, struct lprocfs_vars *vars,
103                         char *name)
104 {
105         struct obd_type *type;
106         int rc = 0;
107         ENTRY;
108
109         LASSERT(strnlen(name, 1024) < 1024);    /* sanity check */
110
111         if (class_search_type(name)) {
112                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
113                 RETURN(-EEXIST);
114         }
115
116         rc = -ENOMEM;
117         OBD_ALLOC(type, sizeof(*type));
118         if (type == NULL)
119                 RETURN(rc);
120
121         OBD_ALLOC(type->typ_ops, sizeof(*type->typ_ops));
122         OBD_ALLOC(type->typ_name, strlen(name) + 1);
123         if (type->typ_ops == NULL || type->typ_name == NULL)
124                 GOTO (failed, rc);
125
126         *(type->typ_ops) = *ops;
127         strcpy(type->typ_name, name);
128
129 #ifdef LPROCFS
130         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
131                                               vars, type);
132 #endif
133         if (IS_ERR(type->typ_procroot)) {
134                 rc = PTR_ERR(type->typ_procroot);
135                 type->typ_procroot = NULL;
136                 GOTO (failed, rc);
137         }
138
139         spin_lock(&obd_types_lock);
140         list_add(&type->typ_chain, &obd_types);
141         spin_unlock(&obd_types_lock);
142
143         RETURN (0);
144
145  failed:
146         if (type->typ_name != NULL)
147                 OBD_FREE(type->typ_name, strlen(name) + 1);
148         if (type->typ_ops != NULL)
149                 OBD_FREE (type->typ_ops, sizeof (*type->typ_ops));
150         OBD_FREE(type, sizeof(*type));
151         RETURN(rc);
152 }
153
154 int class_unregister_type(char *name)
155 {
156         struct obd_type *type = class_search_type(name);
157         ENTRY;
158
159         if (!type) {
160                 CERROR("unknown obd type\n");
161                 RETURN(-EINVAL);
162         }
163
164         if (type->typ_refcnt) {
165                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
166                 /* This is a bad situation, let's make the best of it */
167                 /* Remove ops, but leave the name for debugging */
168                 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
169                 RETURN(-EBUSY);
170         }
171
172         if (type->typ_procroot) {
173                 lprocfs_remove(type->typ_procroot);
174                 type->typ_procroot = NULL;
175         }
176
177         spin_lock(&obd_types_lock);
178         list_del(&type->typ_chain);
179         spin_unlock(&obd_types_lock);
180         OBD_FREE(type->typ_name, strlen(name) + 1);
181         if (type->typ_ops != NULL)
182                 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
183         OBD_FREE(type, sizeof(*type));
184         RETURN(0);
185 } /* class_unregister_type */
186
187 struct obd_device *class_newdev(struct obd_type *type, char *name)
188 {
189         struct obd_device *result = NULL;
190         int i;
191
192         spin_lock(&obd_dev_lock);
193         for (i = 0 ; i < MAX_OBD_DEVICES; i++) {
194                 struct obd_device *obd = &obd_dev[i];
195                 if (obd->obd_name && (strcmp(name, obd->obd_name) == 0)) {
196                         CERROR("Device %s already exists, won't add\n", name);
197                         if (result) {
198                                 result->obd_type = NULL;
199                                 result->obd_name = NULL;
200                                 result = NULL;
201                         }
202                         break;
203                 }
204                 if (!result && !obd->obd_type) {
205                         LASSERT(obd->obd_minor == i);
206                         memset(obd, 0, sizeof(*obd));
207                         obd->obd_minor = i;
208                         obd->obd_type = type;
209                         obd->obd_name = name;
210                         CDEBUG(D_IOCTL, "Adding new device %s\n",
211                                obd->obd_name);
212                         result = obd;
213                 }
214         }
215         spin_unlock(&obd_dev_lock);
216         return result;
217 }
218
219 void class_release_dev(struct obd_device *obd)
220 {
221         int minor = obd->obd_minor;
222
223         spin_lock(&obd_dev_lock);
224         memset(obd, 0x5a, sizeof(*obd));
225         obd->obd_type = NULL;
226         obd->obd_minor = minor;
227         obd->obd_name = NULL;
228         spin_unlock(&obd_dev_lock);
229 }
230
231 int class_name2dev(char *name)
232 {
233         int i;
234
235         if (!name)
236                 return -1;
237
238         spin_lock(&obd_dev_lock);
239         for (i = 0; i < MAX_OBD_DEVICES; i++) {
240                 struct obd_device *obd = &obd_dev[i];
241                 if (obd->obd_name && strcmp(name, obd->obd_name) == 0) {
242                         /* Make sure we finished attaching before we give
243                            out any references */
244                         if (obd->obd_attached) { 
245                                 spin_unlock(&obd_dev_lock);
246                                 return i;
247                         }
248                         break;
249                 }
250         }
251         spin_unlock(&obd_dev_lock);
252
253         return -1;
254 }
255
256 struct obd_device *class_name2obd(char *name)
257 {
258         int dev = class_name2dev(name);
259         if (dev < 0)
260                 return NULL;
261         return &obd_dev[dev];
262 }
263
264 int class_uuid2dev(struct obd_uuid *uuid)
265 {
266         int i;
267
268         spin_lock(&obd_dev_lock);
269         for (i = 0; i < MAX_OBD_DEVICES; i++) {
270                 struct obd_device *obd = &obd_dev[i];
271                 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
272                         spin_unlock(&obd_dev_lock);
273                         return i;
274                 }
275         }
276         spin_unlock(&obd_dev_lock);
277
278         return -1;
279 }
280
281 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
282 {
283         int dev = class_uuid2dev(uuid);
284         if (dev < 0)
285                 return NULL;
286         return &obd_dev[dev];
287 }
288
289 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
290    specified, then only the client with that uuid is returned,
291    otherwise any client connected to the tgt is returned. */
292 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
293                                           char * typ_name,
294                                           struct obd_uuid *grp_uuid)
295 {
296         int i;
297
298         spin_lock(&obd_dev_lock);
299         for (i = 0; i < MAX_OBD_DEVICES; i++) {
300                 struct obd_device *obd = &obd_dev[i];
301                 if (obd->obd_type == NULL)
302                         continue;
303                 if ((strncmp(obd->obd_type->typ_name, typ_name,
304                              strlen(typ_name)) == 0)) {
305                         struct client_obd *cli = &obd->u.cli;
306                         struct obd_import *imp = cli->cl_import;
307                         if (obd_uuid_equals(tgt_uuid, &imp->imp_target_uuid) &&
308                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
309                                                          &obd->obd_uuid) : 1)) {
310                                 spin_unlock(&obd_dev_lock);
311                                 return obd;
312                         }
313                 }
314         }
315         spin_unlock(&obd_dev_lock);
316
317         return NULL;
318 }
319
320 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
321                                             struct obd_uuid *grp_uuid)
322 {
323         struct obd_device *obd;
324
325         obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
326         if (!obd)
327                 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
328                                             grp_uuid);
329         return obd;
330 }
331
332 /* Iterate the obd_device list looking devices have grp_uuid. Start
333    searching at *next, and if a device is found, the next index to look
334    at is saved in *next. If next is NULL, then the first matching device
335    will always be returned. */
336 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
337 {
338         int i;
339
340         if (next == NULL)
341                 i = 0;
342         else if (*next >= 0 && *next < MAX_OBD_DEVICES)
343                 i = *next;
344         else
345                 return NULL;
346
347         spin_lock(&obd_dev_lock);
348         for (; i < MAX_OBD_DEVICES; i++) {
349                 struct obd_device *obd = &obd_dev[i];
350                 if (obd->obd_type == NULL)
351                         continue;
352                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
353                         if (next != NULL)
354                                 *next = i+1;
355                         spin_unlock(&obd_dev_lock);
356                         return obd;
357                 }
358         }
359         spin_unlock(&obd_dev_lock);
360
361         return NULL;
362 }
363
364 static void obd_cleanup_qunit_cache(void)
365 {
366         int i;
367         ENTRY;
368
369         spin_lock(&qunit_hash_lock);
370         for (i = 0; i < NR_DQHASH; i++)
371                 LASSERT(list_empty(qunit_hash + i));
372         spin_unlock(&qunit_hash_lock);
373         
374         if (qunit_cachep) {
375                 LASSERTF(kmem_cache_destroy(qunit_cachep) == 0,
376                          "Cannot destroy ll_qunit_cache\n");
377                 qunit_cachep = NULL;
378         }
379         EXIT;
380 }
381
382 void obd_cleanup_caches(void)
383 {
384         ENTRY;
385         if (obdo_cachep) {
386                 LASSERTF(kmem_cache_destroy(obdo_cachep) == 0,
387                          "Cannot destory ll_obdo_cache\n");
388                 obdo_cachep = NULL;
389         }
390         if (import_cachep) {
391                 LASSERTF(kmem_cache_destroy(import_cachep) == 0,
392                          "Cannot destory ll_import_cache\n");
393                 import_cachep = NULL;
394         }
395         obd_cleanup_qunit_cache();
396         EXIT;
397 }
398
399 static int obd_init_qunit_cache(void)
400 {
401         int i;
402         ENTRY;
403         
404         LASSERT(qunit_cachep == NULL);
405         qunit_cachep = kmem_cache_create("ll_qunit_cache", 
406                                          sizeof(struct lustre_qunit),
407                                          0, 0, NULL, NULL);
408         if (!qunit_cachep)
409                 RETURN(-ENOMEM);
410
411         spin_lock(&qunit_hash_lock);
412         for (i = 0; i < NR_DQHASH; i++)
413                 INIT_LIST_HEAD(qunit_hash + i);
414         spin_unlock(&qunit_hash_lock);
415         RETURN(0);
416 }
417
418 int obd_init_caches(void)
419 {
420         int rc = 0;
421         ENTRY;
422
423         LASSERT(obdo_cachep == NULL);
424         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
425                                         0, 0, NULL, NULL);
426         if (!obdo_cachep)
427                 GOTO(out, -ENOMEM);
428
429         LASSERT(import_cachep == NULL);
430         import_cachep = kmem_cache_create("ll_import_cache",
431                                           sizeof(struct obd_import),
432                                           0, 0, NULL, NULL);
433         if (!import_cachep)
434                 GOTO(out, -ENOMEM);
435
436         rc = obd_init_qunit_cache();
437         if (rc)
438                 GOTO(out, rc);
439
440         RETURN(0);
441  out:
442         obd_cleanup_caches();
443         RETURN(-ENOMEM);
444
445 }
446
447 /* map connection to client */
448 struct obd_export *class_conn2export(struct lustre_handle *conn)
449 {
450         struct obd_export *export;
451         ENTRY;
452
453         if (!conn) {
454                 CDEBUG(D_CACHE, "looking for null handle\n");
455                 RETURN(NULL);
456         }
457
458         if (conn->cookie == -1) {  /* this means assign a new connection */
459                 CDEBUG(D_CACHE, "want a new connection\n");
460                 RETURN(NULL);
461         }
462
463         CDEBUG(D_IOCTL, "looking for export cookie "LPX64"\n", conn->cookie);
464         export = class_handle2object(conn->cookie);
465         RETURN(export);
466 }
467
468 struct obd_device *class_exp2obd(struct obd_export *exp)
469 {
470         if (exp)
471                 return exp->exp_obd;
472         return NULL;
473 }
474
475 struct obd_device *class_conn2obd(struct lustre_handle *conn)
476 {
477         struct obd_export *export;
478         export = class_conn2export(conn);
479         if (export) {
480                 struct obd_device *obd = export->exp_obd;
481                 class_export_put(export);
482                 return obd;
483         }
484         return NULL;
485 }
486
487 struct obd_import *class_exp2cliimp(struct obd_export *exp)
488 {
489         struct obd_device *obd = exp->exp_obd;
490         if (obd == NULL)
491                 return NULL;
492         return obd->u.cli.cl_import;
493 }
494
495 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
496 {
497         struct obd_device *obd = class_conn2obd(conn);
498         if (obd == NULL)
499                 return NULL;
500         return obd->u.cli.cl_import;
501 }
502
503 /* Export management functions */
504 static void export_handle_addref(void *export)
505 {
506         class_export_get(export);
507 }
508
509 void __class_export_put(struct obd_export *exp)
510 {
511         if (atomic_dec_and_test(&exp->exp_refcount)) {
512                 struct obd_device *obd = exp->exp_obd;
513                 CDEBUG(D_IOCTL, "destroying export %p/%s\n", exp,
514                        exp->exp_client_uuid.uuid);
515
516                 LASSERT(obd != NULL);
517
518                 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
519                 if (exp->exp_connection)
520                         ptlrpc_put_connection_superhack(exp->exp_connection);
521
522                 LASSERT(list_empty(&exp->exp_outstanding_replies));
523                 LASSERT(list_empty(&exp->exp_handle.h_link));
524                 obd_destroy_export(exp);
525
526                 OBD_FREE(exp, sizeof(*exp));
527                 class_decref(obd);
528         }
529 }
530
531 /* Creates a new export, adds it to the hash table, and returns a
532  * pointer to it. The refcount is 2: one for the hash reference, and
533  * one for the pointer returned by this function. */
534 struct obd_export *class_new_export(struct obd_device *obd)
535 {
536         struct obd_export *export;
537
538         OBD_ALLOC(export, sizeof(*export));
539         if (!export) {
540                 CERROR("no memory! (minor %d)\n", obd->obd_minor);
541                 return NULL;
542         }
543
544         export->exp_conn_cnt = 0;
545         atomic_set(&export->exp_refcount, 2);
546         export->exp_obd = obd;
547         INIT_LIST_HEAD(&export->exp_outstanding_replies);
548         /* XXX this should be in LDLM init */
549         INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
550
551         INIT_LIST_HEAD(&export->exp_handle.h_link);
552         class_handle_hash(&export->exp_handle, export_handle_addref);
553         spin_lock_init(&export->exp_lock);
554
555         spin_lock(&obd->obd_dev_lock);
556         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
557         atomic_inc(&obd->obd_refcount);
558         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
559         export->exp_obd->obd_num_exports++;
560         spin_unlock(&obd->obd_dev_lock);
561         obd_init_export(export);
562         return export;
563 }
564
565 void class_unlink_export(struct obd_export *exp)
566 {
567         class_handle_unhash(&exp->exp_handle);
568
569         spin_lock(&exp->exp_obd->obd_dev_lock);
570         list_del_init(&exp->exp_obd_chain);
571         exp->exp_obd->obd_num_exports--;
572         spin_unlock(&exp->exp_obd->obd_dev_lock);
573
574         class_export_put(exp);
575 }
576
577 /* Import management functions */
578 static void import_handle_addref(void *import)
579 {
580         class_import_get(import);
581 }
582
583 struct obd_import *class_import_get(struct obd_import *import)
584 {
585         atomic_inc(&import->imp_refcount);
586         CDEBUG(D_IOCTL, "import %p refcount=%d\n", import,
587                atomic_read(&import->imp_refcount));
588         return import;
589 }
590
591 void class_import_put(struct obd_import *import)
592 {
593         ENTRY;
594
595         CDEBUG(D_IOCTL, "import %p refcount=%d\n", import,
596                atomic_read(&import->imp_refcount) - 1);
597
598         LASSERT(atomic_read(&import->imp_refcount) > 0);
599         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
600         if (!atomic_dec_and_test(&import->imp_refcount)) {
601                 EXIT;
602                 return;
603         }
604
605         CDEBUG(D_IOCTL, "destroying import %p\n", import);
606
607         ptlrpc_put_connection_superhack(import->imp_connection);
608
609         while (!list_empty(&import->imp_conn_list)) {
610                 struct obd_import_conn *imp_conn;
611
612                 imp_conn = list_entry(import->imp_conn_list.next,
613                                       struct obd_import_conn, oic_item);
614                 list_del(&imp_conn->oic_item);
615                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
616                 OBD_FREE(imp_conn, sizeof(*imp_conn));
617         }
618
619         LASSERT(list_empty(&import->imp_handle.h_link));
620         OBD_FREE(import, sizeof(*import));
621         EXIT;
622 }
623
624 struct obd_import *class_new_import(void)
625 {
626         struct obd_import *imp;
627
628         OBD_ALLOC(imp, sizeof(*imp));
629         if (imp == NULL)
630                 return NULL;
631
632         INIT_LIST_HEAD(&imp->imp_replay_list);
633         INIT_LIST_HEAD(&imp->imp_sending_list);
634         INIT_LIST_HEAD(&imp->imp_delayed_list);
635         spin_lock_init(&imp->imp_lock);
636         imp->imp_conn_cnt = 0;
637         imp->imp_max_transno = 0;
638         imp->imp_peer_committed_transno = 0;
639         imp->imp_state = LUSTRE_IMP_NEW;
640         init_waitqueue_head(&imp->imp_recovery_waitq);
641
642         atomic_set(&imp->imp_refcount, 2);
643         atomic_set(&imp->imp_inflight, 0);
644         atomic_set(&imp->imp_replay_inflight, 0);
645         INIT_LIST_HEAD(&imp->imp_conn_list);
646         INIT_LIST_HEAD(&imp->imp_handle.h_link);
647         class_handle_hash(&imp->imp_handle, import_handle_addref);
648
649         return imp;
650 }
651
652 void class_destroy_import(struct obd_import *import)
653 {
654         LASSERT(import != NULL);
655         LASSERT(import != LP_POISON);
656
657         class_handle_unhash(&import->imp_handle);
658
659         /* Abort any inflight DLM requests and NULL out their (about to be
660          * freed) import. */
661         /* Invalidate all requests on import, would be better to call
662            ptlrpc_set_import_active(imp, 0); */
663         import->imp_generation++;
664         ptlrpc_abort_inflight_superhack(import);
665
666         class_import_put(import);
667 }
668
669 /* A connection defines an export context in which preallocation can
670    be managed. This releases the export pointer reference, and returns
671    the export handle, so the export refcount is 1 when this function
672    returns. */
673 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
674                   struct obd_uuid *cluuid)
675 {
676         struct obd_export *export;
677         LASSERT(conn != NULL);
678         LASSERT(obd != NULL);
679         LASSERT(cluuid != NULL);
680         ENTRY;
681
682         export = class_new_export(obd);
683         if (export == NULL)
684                 RETURN(-ENOMEM);
685
686         conn->cookie = export->exp_handle.h_cookie;
687         memcpy(&export->exp_client_uuid, cluuid,
688                sizeof(export->exp_client_uuid));
689         class_export_put(export);
690
691         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
692                cluuid->uuid, conn->cookie);
693         RETURN(0);
694 }
695
696 /* This function removes two references from the export: one for the
697  * hash entry and one for the export pointer passed in.  The export
698  * pointer passed to this function is destroyed should not be used
699  * again. */
700 int class_disconnect(struct obd_export *export)
701 {
702         ENTRY;
703
704         if (export == NULL) {
705                 fixme();
706                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
707                 RETURN(-EINVAL);
708         }
709
710         /* XXX this shouldn't have to be here, but double-disconnect will crash
711          * otherwise, and sometimes double-disconnect happens.  abort_recovery,
712          * for example. */
713         if (list_empty(&export->exp_handle.h_link))
714                 RETURN(0);
715
716         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
717                export->exp_handle.h_cookie);
718
719         class_unlink_export(export);
720         class_export_put(export);
721         RETURN(0);
722 }
723
724 static void  class_disconnect_export_list(struct list_head *list, int flags)
725 {
726         int rc;
727         struct lustre_handle fake_conn;
728         struct obd_export *fake_exp, *exp;
729         ENTRY;
730
731         /* It's possible that an export may disconnect itself, but 
732          * nothing else will be added to this list. */
733         while(!list_empty(list)) {
734                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
735                 class_export_get(exp);
736                 exp->exp_flags = flags;
737
738                 if (obd_uuid_equals(&exp->exp_client_uuid,
739                                     &exp->exp_obd->obd_uuid)) {
740                         CDEBUG(D_HA,
741                                "exp %p export uuid == obd uuid, don't discon\n",
742                                exp);
743                         /* Need to delete this now so we don't end up pointing
744                          * to work_list later when this export is cleaned up. */
745                         list_del_init(&exp->exp_obd_chain);
746                         class_export_put(exp);
747                         continue;
748                 }
749
750                 fake_conn.cookie = exp->exp_handle.h_cookie;
751                 fake_exp = class_conn2export(&fake_conn);
752                 if (!fake_exp) {
753                         class_export_put(exp);
754                         continue;
755                 }
756                 fake_exp->exp_flags = flags;
757                 rc = obd_disconnect(fake_exp);
758                 class_export_put(exp);
759                 if (rc) {
760                         CDEBUG(D_HA, "disconnecting export %p failed: %d\n",
761                                exp, rc);
762                 } else {
763                         CDEBUG(D_HA, "export %p disconnected\n", exp);
764                 }
765         }
766         EXIT;
767 }
768
769 static inline int get_exp_flags_from_obd(struct obd_device *obd)
770 {
771         return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
772                 (obd->obd_force ? OBD_OPT_FORCE : 0));
773 }
774
775 void class_disconnect_exports(struct obd_device *obd)
776 {
777         struct list_head work_list;
778         ENTRY;
779
780         /* Move all of the exports from obd_exports to a work list, en masse. */
781         spin_lock(&obd->obd_dev_lock);
782         list_add(&work_list, &obd->obd_exports);
783         list_del_init(&obd->obd_exports);
784         spin_unlock(&obd->obd_dev_lock);
785
786         CDEBUG(D_HA, "OBD device %d (%p) has exports, "
787                "disconnecting them\n", obd->obd_minor, obd);
788         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
789         EXIT;
790 }
791
792 /* Remove exports that have not completed recovery.
793  */
794 void class_disconnect_stale_exports(struct obd_device *obd)
795 {
796         struct list_head work_list;
797         struct list_head *pos, *n;
798         struct obd_export *exp;
799         int cnt = 0;
800         ENTRY;
801   
802         INIT_LIST_HEAD(&work_list);
803         spin_lock(&obd->obd_dev_lock);
804         list_for_each_safe(pos, n, &obd->obd_exports) {
805                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
806                 if (exp->exp_replay_needed) {
807                         list_del(&exp->exp_obd_chain);
808                         list_add(&exp->exp_obd_chain, &work_list);
809                         cnt++;
810                 }
811         }
812         spin_unlock(&obd->obd_dev_lock);
813
814         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n", 
815                obd->obd_name, cnt);
816         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
817         EXIT;
818 }
819
820 int oig_init(struct obd_io_group **oig_out)
821 {
822         struct obd_io_group *oig;
823         ENTRY;
824
825         OBD_ALLOC(oig, sizeof(*oig));
826         if (oig == NULL)
827                 RETURN(-ENOMEM);
828
829         spin_lock_init(&oig->oig_lock);
830         oig->oig_rc = 0;
831         oig->oig_pending = 0;
832         atomic_set(&oig->oig_refcount, 1);
833         init_waitqueue_head(&oig->oig_waitq);
834         INIT_LIST_HEAD(&oig->oig_occ_list);
835
836         *oig_out = oig;
837         RETURN(0);
838 };
839
840 static inline void oig_grab(struct obd_io_group *oig)
841 {
842         atomic_inc(&oig->oig_refcount);
843 }
844 void oig_release(struct obd_io_group *oig)
845 {
846         if (atomic_dec_and_test(&oig->oig_refcount))
847                 OBD_FREE(oig, sizeof(*oig));
848 }
849
850 void oig_add_one(struct obd_io_group *oig,
851                   struct oig_callback_context *occ)
852 {
853         unsigned long flags;
854         CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
855         spin_lock_irqsave(&oig->oig_lock, flags);
856         oig->oig_pending++;
857         if (occ != NULL)
858                 list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
859         spin_unlock_irqrestore(&oig->oig_lock, flags);
860         oig_grab(oig);
861 }
862
863 void oig_complete_one(struct obd_io_group *oig,
864                       struct oig_callback_context *occ, int rc)
865 {
866         unsigned long flags;
867         wait_queue_head_t *wake = NULL;
868         int old_rc;
869
870         spin_lock_irqsave(&oig->oig_lock, flags);
871
872         if (occ != NULL)
873                 list_del_init(&occ->occ_oig_item);
874
875         old_rc = oig->oig_rc;
876         if (oig->oig_rc == 0 && rc != 0)
877                 oig->oig_rc = rc;
878
879         if (--oig->oig_pending <= 0)
880                 wake = &oig->oig_waitq;
881
882         spin_unlock_irqrestore(&oig->oig_lock, flags);
883
884         CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
885                         "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
886                         oig->oig_pending);
887         if (wake)
888                 wake_up(wake);
889         oig_release(oig);
890 }
891
892 static int oig_done(struct obd_io_group *oig)
893 {
894         unsigned long flags;
895         int rc = 0;
896         spin_lock_irqsave(&oig->oig_lock, flags);
897         if (oig->oig_pending <= 0)
898                 rc = 1;
899         spin_unlock_irqrestore(&oig->oig_lock, flags);
900         return rc;
901 }
902
903 static void interrupted_oig(void *data)
904 {
905         struct obd_io_group *oig = data;
906         struct oig_callback_context *occ;
907         unsigned long flags;
908
909         spin_lock_irqsave(&oig->oig_lock, flags);
910         /* We need to restart the processing each time we drop the lock, as
911          * it is possible other threads called oig_complete_one() to remove
912          * an entry elsewhere in the list while we dropped lock.  We need to
913          * drop the lock because osc_ap_completion() calls oig_complete_one()
914          * which re-gets this lock ;-) as well as a lock ordering issue. */
915 restart:
916         list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
917                 if (occ->interrupted)
918                         continue;
919                 occ->interrupted = 1;
920                 spin_unlock_irqrestore(&oig->oig_lock, flags);
921                 occ->occ_interrupted(occ);
922                 spin_lock_irqsave(&oig->oig_lock, flags);
923                 goto restart;
924         }
925         spin_unlock_irqrestore(&oig->oig_lock, flags);
926 }
927
928 int oig_wait(struct obd_io_group *oig)
929 {
930         struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
931         int rc;
932
933         CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
934
935         do {
936                 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
937                 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
938                 /* we can't continue until the oig has emptied and stopped
939                  * referencing state that the caller will free upon return */
940                 if (rc == -EINTR)
941                         lwi = (struct l_wait_info){ 0, };
942         } while (rc == -EINTR);
943
944         LASSERTF(oig->oig_pending == 0,
945                  "exiting oig_wait(oig = %p) with %d pending\n", oig,
946                  oig->oig_pending);
947
948         CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
949         return oig->oig_rc;
950 }