Whamcloud - gitweb
b=23787 make struct lprocfs_percpu definition C99 compliant
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50 #include <lustre_export.h>
51
52 extern struct list_head obd_types;
53 spinlock_t obd_types_lock;
54
55 cfs_mem_cache_t *obd_device_cachep;
56 cfs_mem_cache_t *obdo_cachep;
57 EXPORT_SYMBOL(obdo_cachep);
58 cfs_mem_cache_t *import_cachep;
59
60 struct list_head  obd_zombie_imports;
61 struct list_head  obd_zombie_exports;
62 spinlock_t        obd_zombie_impexp_lock;
63 static void obd_zombie_impexp_notify(void);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66
67 /*
68  * support functions: we could use inter-module communication, but this
69  * is more portable to other OS's
70  */
71 static struct obd_device *obd_device_alloc(void)
72 {
73         struct obd_device *obd;
74
75         OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
76         if (obd != NULL) {
77                 obd->obd_magic = OBD_DEVICE_MAGIC;
78         }
79         return obd;
80 }
81 EXPORT_SYMBOL(obd_device_alloc);
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic "
87                  "%08x != %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up "
90                        "(obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96 EXPORT_SYMBOL(obd_device_free);
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         struct list_head *tmp;
101         struct obd_type *type;
102
103         spin_lock(&obd_types_lock);
104         list_for_each(tmp, &obd_types) {
105                 type = list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114
115 struct obd_type *class_get_type(const char *name)
116 {
117         struct obd_type *type = class_search_type(name);
118
119 #ifdef HAVE_MODULE_LOADING_SUPPORT
120         if (!type) {
121                 const char *modname = name;
122                 if (strcmp(modname, LUSTRE_MDT_NAME) == 0)
123                         modname = LUSTRE_MDS_NAME;
124                 if (!request_module("%s", modname)) {
125                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126                         type = class_search_type(name);
127                 } else {
128                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
129                                            modname);
130                 }
131         }
132 #endif
133         if (type) {
134                 spin_lock(&type->obd_type_lock);
135                 type->typ_refcnt++;
136                 try_module_get(type->typ_ops->o_owner);
137                 spin_unlock(&type->obd_type_lock);
138         }
139         return type;
140 }
141
142 void class_put_type(struct obd_type *type)
143 {
144         LASSERT(type);
145         spin_lock(&type->obd_type_lock);
146         type->typ_refcnt--;
147         module_put(type->typ_ops->o_owner);
148         spin_unlock(&type->obd_type_lock);
149 }
150
151 int class_register_type(struct obd_ops *ops, struct lprocfs_vars *vars,
152                         const char *name)
153 {
154         struct obd_type *type;
155         int rc = 0;
156         ENTRY;
157
158         LASSERT(strnlen(name, 1024) < 1024);    /* sanity check */
159
160         if (class_search_type(name)) {
161                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
162                 RETURN(-EEXIST);
163         }
164
165         rc = -ENOMEM;
166         OBD_ALLOC(type, sizeof(*type));
167         if (type == NULL)
168                 RETURN(rc);
169
170         OBD_ALLOC(type->typ_ops, sizeof(*type->typ_ops));
171         OBD_ALLOC(type->typ_name, strlen(name) + 1);
172         if (type->typ_ops == NULL || type->typ_name == NULL)
173                 GOTO (failed, rc);
174
175         *(type->typ_ops) = *ops;
176         strcpy(type->typ_name, name);
177         spin_lock_init(&type->obd_type_lock);
178
179 #ifdef LPROCFS
180         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
181                                               vars, type);
182         if (IS_ERR(type->typ_procroot)) {
183                 rc = PTR_ERR(type->typ_procroot);
184                 type->typ_procroot = NULL;
185                 GOTO (failed, rc);
186         }
187 #endif
188
189         spin_lock(&obd_types_lock);
190         list_add(&type->typ_chain, &obd_types);
191         spin_unlock(&obd_types_lock);
192
193         RETURN (0);
194
195  failed:
196         if (type->typ_name != NULL)
197                 OBD_FREE(type->typ_name, strlen(name) + 1);
198         if (type->typ_ops != NULL)
199                 OBD_FREE (type->typ_ops, sizeof (*type->typ_ops));
200         OBD_FREE(type, sizeof(*type));
201         RETURN(rc);
202 }
203
204 int class_unregister_type(const char *name)
205 {
206         struct obd_type *type = class_search_type(name);
207         ENTRY;
208
209         if (!type) {
210                 CERROR("unknown obd type\n");
211                 RETURN(-EINVAL);
212         }
213
214         if (type->typ_refcnt) {
215                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
216                 /* This is a bad situation, let's make the best of it */
217                 /* Remove ops, but leave the name for debugging */
218                 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
219                 RETURN(-EBUSY);
220         }
221
222         if (type->typ_procroot)
223                 lprocfs_remove(&type->typ_procroot);
224
225         spin_lock(&obd_types_lock);
226         list_del(&type->typ_chain);
227         spin_unlock(&obd_types_lock);
228         OBD_FREE(type->typ_name, strlen(name) + 1);
229         if (type->typ_ops != NULL)
230                 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
231         OBD_FREE(type, sizeof(*type));
232         RETURN(0);
233 } /* class_unregister_type */
234
235 /**
236  * Create a new obd device.
237  *
238  * Find an empty slot in ::obd_devs[], create a new obd device in it.
239  *
240  * \param typename [in] obd device type string.
241  * \param name     [in] obd device name.
242  *
243  * \retval NULL if create fails, otherwise return the obd device
244  *         pointer created.
245  */
246 struct obd_device *class_newdev(const char *type_name, const char *name)
247 {
248         struct obd_device *result = NULL;
249         struct obd_device *newdev;
250         struct obd_type *type = NULL;
251         int i;
252         int new_obd_minor = 0;
253
254         if (strlen(name) >= MAX_OBD_NAME) {
255                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
256                 RETURN(ERR_PTR(-EINVAL));
257         }
258
259         type = class_get_type(type_name);
260         if (type == NULL){
261                 CERROR("OBD: unknown type: %s\n", type_name);
262                 RETURN(ERR_PTR(-ENODEV));
263         }
264
265         newdev = obd_device_alloc();
266         if (newdev == NULL) {
267                 class_put_type(type);
268                 RETURN(ERR_PTR(-ENOMEM));
269         }
270         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
271
272         spin_lock(&obd_dev_lock);
273         for (i = 0; i < class_devno_max(); i++) {
274                 struct obd_device *obd = class_num2obd(i);
275                 if (obd && obd->obd_name && (strcmp(name, obd->obd_name) == 0)){
276                         CERROR("Device %s already exists, won't add\n", name);
277                         if (result) {
278                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
279                                          "%p obd_magic %08x != %08x\n", result,
280                                          result->obd_magic, OBD_DEVICE_MAGIC);
281                                 LASSERTF(result->obd_minor == new_obd_minor,
282                                          "%p obd_minor %d != %d\n", result,
283                                          result->obd_minor, new_obd_minor);
284
285                                 obd_devs[result->obd_minor] = NULL;
286                                 result->obd_name[0]='\0';
287                         }
288                         result = ERR_PTR(-EEXIST);
289                         break;
290                 }
291                 if (!result && !obd) {
292                         result = newdev;
293                         result->obd_minor = i;
294                         new_obd_minor = i;
295                         result->obd_type = type;
296                         strncpy(result->obd_name, name,
297                                 sizeof(result->obd_name) - 1);
298                         obd_devs[i] = result;
299                 }
300         }
301         spin_unlock(&obd_dev_lock);
302
303         if (result == NULL && i >= class_devno_max()) {
304                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
305                        class_devno_max());
306                 result = ERR_PTR(-EOVERFLOW);
307         }
308
309         if (IS_ERR(result)) {
310                 obd_device_free(newdev);
311                 class_put_type(type);
312         } else {
313                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
314                        result->obd_name, result);
315         }
316         return result;
317 }
318
319 void class_release_dev(struct obd_device *obd)
320 {
321         struct obd_type *obd_type = obd->obd_type;
322
323         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
324                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
325         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
326                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
327         LASSERT(obd_type != NULL);
328
329         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
330                obd->obd_name,obd->obd_type->typ_name);
331
332         spin_lock(&obd_dev_lock);
333         obd_devs[obd->obd_minor] = NULL;
334         spin_unlock(&obd_dev_lock);
335         obd_device_free(obd);
336
337         class_put_type(obd_type);
338 }
339
340 int class_name2dev(const char *name)
341 {
342         int i;
343
344         if (!name)
345                 return -1;
346
347         spin_lock(&obd_dev_lock);
348         for (i = 0; i < class_devno_max(); i++) {
349                 struct obd_device *obd = class_num2obd(i);
350                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
351                         /* Make sure we finished attaching before we give
352                            out any references */
353                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
354                         if (obd->obd_attached) {
355                                 spin_unlock(&obd_dev_lock);
356                                 return i;
357                         }
358                         break;
359                 }
360         }
361         spin_unlock(&obd_dev_lock);
362
363         return -1;
364 }
365
366 struct obd_device *class_name2obd(const char *name)
367 {
368         int dev = class_name2dev(name);
369
370         if (dev < 0 || dev > class_devno_max())
371                 return NULL;
372         return class_num2obd(dev);
373 }
374
375 int class_uuid2dev(struct obd_uuid *uuid)
376 {
377         int i;
378
379         spin_lock(&obd_dev_lock);
380         for (i = 0; i < class_devno_max(); i++) {
381                 struct obd_device *obd = class_num2obd(i);
382                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
383                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
384                         spin_unlock(&obd_dev_lock);
385                         return i;
386                 }
387         }
388         spin_unlock(&obd_dev_lock);
389
390         return -1;
391 }
392
393 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
394 {
395         int dev = class_uuid2dev(uuid);
396         if (dev < 0)
397                 return NULL;
398         return class_num2obd(dev);
399 }
400
401 /**
402  * Get obd device from ::obd_devs[]
403  *
404  * \param num [in] array index
405  *
406  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
407  *         otherwise return the obd device there.
408  */
409 struct obd_device *class_num2obd(int num)
410 {
411         struct obd_device *obd = NULL;
412
413         if (num < class_devno_max()) {
414                 obd = obd_devs[num];
415                 if (obd == NULL)
416                         return NULL;
417
418                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
419                          "%p obd_magic %08x != %08x\n",
420                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
421                 LASSERTF(obd->obd_minor == num,
422                          "%p obd_minor %0d != %0d\n",
423                          obd, obd->obd_minor, num);
424         }
425
426         return obd;
427 }
428
429 void class_obd_list(void)
430 {
431         char *status;
432         int i;
433
434         spin_lock(&obd_dev_lock);
435         for (i = 0; i < class_devno_max(); i++) {
436                 struct obd_device *obd = class_num2obd(i);
437                 if (obd == NULL)
438                         continue;
439                 if (obd->obd_stopping)
440                         status = "ST";
441                 else if (obd->obd_set_up)
442                         status = "UP";
443                 else if (obd->obd_attached)
444                         status = "AT";
445                 else
446                         status = "--";
447                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
448                          i, status, obd->obd_type->typ_name,
449                          obd->obd_name, obd->obd_uuid.uuid,
450                          atomic_read(&obd->obd_refcount));
451         }
452         spin_unlock(&obd_dev_lock);
453         return;
454 }
455
456 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
457    specified, then only the client with that uuid is returned,
458    otherwise any client connected to the tgt is returned. */
459 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
460                                           const char * typ_name,
461                                           struct obd_uuid *grp_uuid)
462 {
463         int i;
464
465         spin_lock(&obd_dev_lock);
466         for (i = 0; i < class_devno_max(); i++) {
467                 struct obd_device *obd = class_num2obd(i);
468                 if (obd == NULL)
469                         continue;
470                 if ((strncmp(obd->obd_type->typ_name, typ_name,
471                              strlen(typ_name)) == 0)) {
472                         if (obd_uuid_equals(tgt_uuid,
473                                             &obd->u.cli.cl_target_uuid) &&
474                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
475                                                          &obd->obd_uuid) : 1)) {
476                                 spin_unlock(&obd_dev_lock);
477                                 return obd;
478                         }
479                 }
480         }
481         spin_unlock(&obd_dev_lock);
482
483         return NULL;
484 }
485
486 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
487                                             struct obd_uuid *grp_uuid)
488 {
489         struct obd_device *obd;
490
491         obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
492         if (!obd)
493                 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
494                                             grp_uuid);
495         return obd;
496 }
497
498 /* Iterate the obd_device list looking devices have grp_uuid. Start
499    searching at *next, and if a device is found, the next index to look
500    at is saved in *next. If next is NULL, then the first matching device
501    will always be returned. */
502 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
503 {
504         int i;
505
506         if (next == NULL)
507                 i = 0;
508         else if (*next >= 0 && *next < class_devno_max())
509                 i = *next;
510         else
511                 return NULL;
512
513         spin_lock(&obd_dev_lock);
514         for (; i < class_devno_max(); i++) {
515                 struct obd_device *obd = class_num2obd(i);
516                 if (obd == NULL)
517                         continue;
518                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
519                         if (next != NULL)
520                                 *next = i+1;
521                         spin_unlock(&obd_dev_lock);
522                         return obd;
523                 }
524         }
525         spin_unlock(&obd_dev_lock);
526
527         return NULL;
528 }
529
530
531 void obd_cleanup_caches(void)
532 {
533         int rc;
534
535         ENTRY;
536         if (obd_device_cachep) {
537                 rc = cfs_mem_cache_destroy(obd_device_cachep);
538                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
539                 obd_device_cachep = NULL;
540         }
541         if (obdo_cachep) {
542                 rc = cfs_mem_cache_destroy(obdo_cachep);
543                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
544                 obdo_cachep = NULL;
545         }
546         if (import_cachep) {
547                 rc = cfs_mem_cache_destroy(import_cachep);
548                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
549                 import_cachep = NULL;
550         }
551         EXIT;
552 }
553
554 int obd_init_caches(void)
555 {
556         ENTRY;
557
558         LASSERT(obd_device_cachep == NULL);
559         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
560                                                  sizeof(struct obd_device),
561                                                  0, 0);
562         if (!obd_device_cachep)
563                 GOTO(out, -ENOMEM);
564
565         LASSERT(obdo_cachep == NULL);
566         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
567                                            0, 0);
568         if (!obdo_cachep)
569                 GOTO(out, -ENOMEM);
570
571         LASSERT(import_cachep == NULL);
572         import_cachep = cfs_mem_cache_create("ll_import_cache",
573                                              sizeof(struct obd_import),
574                                              0, 0);
575         if (!import_cachep)
576                 GOTO(out, -ENOMEM);
577
578         RETURN(0);
579  out:
580         obd_cleanup_caches();
581         RETURN(-ENOMEM);
582
583 }
584
585 /* map connection to client */
586 struct obd_export *class_conn2export(struct lustre_handle *conn)
587 {
588         struct obd_export *export;
589         ENTRY;
590
591         if (!conn) {
592                 CDEBUG(D_CACHE, "looking for null handle\n");
593                 RETURN(NULL);
594         }
595
596         if (conn->cookie == -1) {  /* this means assign a new connection */
597                 CDEBUG(D_CACHE, "want a new connection\n");
598                 RETURN(NULL);
599         }
600
601         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
602         export = class_handle2object(conn->cookie);
603         RETURN(export);
604 }
605
606 struct obd_device *class_exp2obd(struct obd_export *exp)
607 {
608         if (exp)
609                 return exp->exp_obd;
610         return NULL;
611 }
612
613 struct obd_device *class_conn2obd(struct lustre_handle *conn)
614 {
615         struct obd_export *export;
616         export = class_conn2export(conn);
617         if (export) {
618                 struct obd_device *obd = export->exp_obd;
619                 class_export_put(export);
620                 return obd;
621         }
622         return NULL;
623 }
624
625 struct obd_import *class_exp2cliimp(struct obd_export *exp)
626 {
627         struct obd_device *obd = exp->exp_obd;
628         if (obd == NULL)
629                 return NULL;
630         return obd->u.cli.cl_import;
631 }
632
633 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
634 {
635         struct obd_device *obd = class_conn2obd(conn);
636         if (obd == NULL)
637                 return NULL;
638         return obd->u.cli.cl_import;
639 }
640
641 /* Export management functions */
642 static void export_handle_addref(void *export)
643 {
644         class_export_get(export);
645 }
646
647 /* called from mds_commit_cb() in context of journal commit callback
648  * and cannot call any blocking functions. */
649 void __class_export_put(struct obd_export *exp)
650 {
651         if (atomic_dec_and_test(&exp->exp_refcount)) {
652                 LASSERT (list_empty(&exp->exp_obd_chain));
653
654                 CDEBUG(D_IOCTL, "final put %p/%s\n",
655                        exp, exp->exp_client_uuid.uuid);
656
657                 /* release nid stat refererence */
658                 lprocfs_exp_cleanup(exp);
659
660                 spin_lock(&obd_zombie_impexp_lock);
661                 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
662                 spin_unlock(&obd_zombie_impexp_lock);
663
664                 obd_zombie_impexp_notify();
665         }
666 }
667 EXPORT_SYMBOL(__class_export_put);
668
669 void class_export_destroy(struct obd_export *exp)
670 {
671         struct obd_device *obd = exp->exp_obd;
672
673         LASSERT (atomic_read(&exp->exp_refcount) == 0);
674
675         CDEBUG(D_IOCTL, "destroying export %p/%s\n", exp,
676                exp->exp_client_uuid.uuid);
677
678         LASSERT(obd != NULL);
679
680         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
681         if (exp->exp_connection)
682                 ptlrpc_put_connection_superhack(exp->exp_connection);
683
684         LASSERT(list_empty(&exp->exp_outstanding_replies));
685         LASSERT(list_empty(&exp->exp_uncommitted_replies));
686         LASSERT(list_empty(&exp->exp_req_replay_queue));
687         LASSERT(list_empty(&exp->exp_queued_rpc));
688         obd_destroy_export(exp);
689
690         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
691         class_decref(obd);
692 }
693
694 /* Creates a new export, adds it to the hash table, and returns a
695  * pointer to it. The refcount is 2: one for the hash reference, and
696  * one for the pointer returned by this function. */
697 struct obd_export *class_new_export(struct obd_device *obd,
698                                     struct obd_uuid *cluuid)
699 {
700         struct obd_export *export;
701         int rc = 0;
702
703         OBD_ALLOC(export, sizeof(*export));
704         if (!export)
705                 return ERR_PTR(-ENOMEM);
706
707         export->exp_conn_cnt = 0;
708         export->exp_lock_hash = NULL;
709         atomic_set(&export->exp_refcount, 2);
710         atomic_set(&export->exp_rpc_count, 0);
711         export->exp_obd = obd;
712         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
713         spin_lock_init(&export->exp_uncommitted_replies_lock);
714         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
715         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
716         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
717
718         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
719         class_handle_hash(&export->exp_handle, export_handle_addref);
720         export->exp_last_request_time = cfs_time_current_sec();
721         spin_lock_init(&export->exp_lock);
722         INIT_HLIST_NODE(&export->exp_uuid_hash);
723         INIT_HLIST_NODE(&export->exp_nid_hash);
724
725         export->exp_client_uuid = *cluuid;
726         obd_init_export(export);
727
728         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
729                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
730                                             &export->exp_uuid_hash);
731                 if (rc != 0) {
732                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
733                                       obd->obd_name, cluuid->uuid, rc);
734                         class_handle_unhash(&export->exp_handle);
735                         OBD_FREE_PTR(export);
736                         return ERR_PTR(-EALREADY);
737                 }
738         }
739
740         spin_lock(&obd->obd_dev_lock);
741         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
742         class_incref(obd);
743         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
744         list_add_tail(&export->exp_obd_chain_timed,
745                       &export->exp_obd->obd_exports_timed);
746         export->exp_obd->obd_num_exports++;
747         spin_unlock(&obd->obd_dev_lock);
748
749         return export;
750 }
751 EXPORT_SYMBOL(class_new_export);
752
753 void class_unlink_export(struct obd_export *exp)
754 {
755         class_handle_unhash(&exp->exp_handle);
756
757         spin_lock(&exp->exp_obd->obd_dev_lock);
758         /* delete an uuid-export hashitem from hashtables */
759         if (!hlist_unhashed(&exp->exp_uuid_hash))
760                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
761                                 &exp->exp_client_uuid,
762                                 &exp->exp_uuid_hash);
763
764         list_del_init(&exp->exp_obd_chain);
765         list_del_init(&exp->exp_obd_chain_timed);
766         exp->exp_obd->obd_num_exports--;
767         spin_unlock(&exp->exp_obd->obd_dev_lock);
768         /* Keep these counter valid always */
769         spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
770         if (exp->exp_delayed) {
771                 spin_lock(&exp->exp_lock);
772                 exp->exp_delayed = 0;
773                 spin_unlock(&exp->exp_lock);
774                 LASSERT(exp->exp_obd->obd_delayed_clients);
775                 exp->exp_obd->obd_delayed_clients--;
776         } else if (exp->exp_replay_needed) {
777                         spin_lock(&exp->exp_lock);
778                         exp->exp_replay_needed = 0;
779                         spin_unlock(&exp->exp_lock);
780                         LASSERT(exp->exp_obd->obd_recoverable_clients);
781                         exp->exp_obd->obd_recoverable_clients--;
782         }
783
784         if (exp->exp_obd->obd_recovering && exp->exp_in_recovery) {
785                 spin_lock(&exp->exp_lock);
786                 exp->exp_in_recovery = 0;
787                 spin_unlock(&exp->exp_lock);
788                 LASSERT(exp->exp_obd->obd_connected_clients);
789                 exp->exp_obd->obd_connected_clients--;
790         }
791         spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
792         class_export_put(exp);
793 }
794 EXPORT_SYMBOL(class_unlink_export);
795
796 /* Import management functions */
797 static void import_handle_addref(void *import)
798 {
799         class_import_get(import);
800 }
801
802 struct obd_import *class_import_get(struct obd_import *import)
803 {
804         LASSERT(atomic_read(&import->imp_refcount) >= 0);
805         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
806         atomic_inc(&import->imp_refcount);
807         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
808                atomic_read(&import->imp_refcount), 
809                import->imp_obd->obd_name);
810         return import;
811 }
812 EXPORT_SYMBOL(class_import_get);
813
814 void class_import_put(struct obd_import *import)
815 {
816         ENTRY;
817
818         LASSERT(atomic_read(&import->imp_refcount) > 0);
819         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
820         LASSERT(list_empty(&import->imp_zombie_chain));
821
822         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
823                atomic_read(&import->imp_refcount) - 1, 
824                import->imp_obd->obd_name);
825
826         if (atomic_dec_and_test(&import->imp_refcount)) {
827                 CDEBUG(D_INFO, "final put import %p\n", import);
828                 spin_lock(&obd_zombie_impexp_lock);
829                 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
830                 spin_unlock(&obd_zombie_impexp_lock);
831
832                 obd_zombie_impexp_notify();
833         }
834
835         EXIT;
836 }
837 EXPORT_SYMBOL(class_import_put);
838
839 void class_import_destroy(struct obd_import *import)
840 {
841         ENTRY;
842
843         CDEBUG(D_IOCTL, "destroying import %p\n", import);
844
845         LASSERT(atomic_read(&import->imp_refcount) == 0);
846
847         ptlrpc_put_connection_superhack(import->imp_connection);
848
849         while (!list_empty(&import->imp_conn_list)) {
850                 struct obd_import_conn *imp_conn;
851
852                 imp_conn = list_entry(import->imp_conn_list.next,
853                                       struct obd_import_conn, oic_item);
854                 list_del(&imp_conn->oic_item);
855                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
856                 OBD_FREE(imp_conn, sizeof(*imp_conn));
857         }
858
859         class_decref(import->imp_obd);
860         OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
861         EXIT;
862 }
863
864 static void init_imp_at(struct imp_at *at) {
865         int i;
866         at_init(&at->iat_net_latency, 0, 0);
867         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
868                 /* max service estimates are tracked on the server side, so
869                    don't use the AT history here, just use the last reported
870                    val. (But keep hist for proc histogram, worst_ever) */
871                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
872                         AT_FLG_NOHIST);
873         }
874 }
875
876 struct obd_import *class_new_import(struct obd_device *obd)
877 {
878         struct obd_import *imp;
879
880         OBD_ALLOC(imp, sizeof(*imp));
881         if (imp == NULL)
882                 return NULL;
883
884         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
885         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
886         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
887         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
888         spin_lock_init(&imp->imp_lock);
889         imp->imp_last_success_conn = 0;
890         imp->imp_state = LUSTRE_IMP_NEW;
891         imp->imp_obd = class_incref(obd);
892         cfs_waitq_init(&imp->imp_recovery_waitq);
893
894         atomic_set(&imp->imp_refcount, 2);
895         atomic_set(&imp->imp_unregistering, 0);
896         atomic_set(&imp->imp_inflight, 0);
897         atomic_set(&imp->imp_replay_inflight, 0);
898         atomic_set(&imp->imp_inval_count, 0);
899         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
900         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
901         class_handle_hash(&imp->imp_handle, import_handle_addref);
902         init_imp_at(&imp->imp_at);
903
904 /* b1_8 supports both v1 & v2. but HEAD only supports v2.
905  * So let's use v2.
906  */
907 #define HAVE_DEFAULT_V2_CONNECT 1
908 #ifdef HAVE_DEFAULT_V2_CONNECT
909         /* the default magic is V2, will be used in connect RPC, and
910          * then adjusted according to the flags in request/reply. */
911         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
912 #else
913         /* the default magic is V1, will be used in connect RPC, and
914          * then adjusted according to the flags in request/reply. */
915         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V1;
916 #endif
917
918         return imp;
919 }
920 EXPORT_SYMBOL(class_new_import);
921
922 void class_destroy_import(struct obd_import *import)
923 {
924         LASSERT(import != NULL);
925         LASSERT(import != LP_POISON);
926
927         class_handle_unhash(&import->imp_handle);
928
929         spin_lock(&import->imp_lock);
930         import->imp_generation++;
931         spin_unlock(&import->imp_lock);
932
933         class_import_put(import);
934 }
935 EXPORT_SYMBOL(class_destroy_import);
936
937 /* A connection defines an export context in which preallocation can
938    be managed. This releases the export pointer reference, and returns
939    the export handle, so the export refcount is 1 when this function
940    returns. */
941 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
942                   struct obd_uuid *cluuid)
943 {
944         struct obd_export *export;
945         LASSERT(conn != NULL);
946         LASSERT(obd != NULL);
947         LASSERT(cluuid != NULL);
948         ENTRY;
949
950         export = class_new_export(obd, cluuid);
951         if (IS_ERR(export))
952                 RETURN(PTR_ERR(export));
953
954         conn->cookie = export->exp_handle.h_cookie;
955         class_export_put(export);
956
957         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
958                cluuid->uuid, conn->cookie);
959         RETURN(0);
960 }
961 EXPORT_SYMBOL(class_connect);
962
963 /* This function removes 1-3 references from the export:
964  * 1 - for export pointer passed
965  * and if disconnect really need
966  * 2 - removing from hash
967  * 3 - in client_unlink_export
968  * The export pointer passed to this function can destroyed */
969 int class_disconnect(struct obd_export *export)
970 {
971         int already_disconnected;
972         ENTRY;
973
974         if (export == NULL) {
975                 fixme();
976                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
977                 RETURN(-EINVAL);
978         }
979
980         spin_lock(&export->exp_lock);
981         already_disconnected = export->exp_disconnected;
982         export->exp_disconnected = 1;
983         spin_unlock(&export->exp_lock);
984
985
986         /* class_cleanup(), abort_recovery(), and class_fail_export()
987          * all end up in here, and if any of them race we shouldn't
988          * call extra class_export_puts(). */
989         if (already_disconnected)
990                 GOTO(no_disconn, already_disconnected);
991
992         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
993                export->exp_handle.h_cookie);
994
995
996         if (!hlist_unhashed(&export->exp_nid_hash))
997                 lustre_hash_del(export->exp_obd->obd_nid_hash,
998                                 &export->exp_connection->c_peer.nid,
999                                 &export->exp_nid_hash);
1000
1001         class_unlink_export(export);
1002
1003 no_disconn:
1004         class_export_put(export);
1005         RETURN(0);
1006 }
1007
1008 /* Return non-zero for a fully connected export */
1009 int class_connected_export(struct obd_export *exp)
1010 {
1011         if (exp) {
1012                 int connected;
1013                 spin_lock(&exp->exp_lock);
1014                 connected = (exp->exp_conn_cnt > 0);
1015                 spin_unlock(&exp->exp_lock);
1016                 return connected;
1017         }
1018         return 0;
1019 }
1020 EXPORT_SYMBOL(class_connected_export);
1021
1022 static void class_disconnect_export_list(struct list_head *list,
1023                                          enum obd_option flags)
1024 {
1025         int rc;
1026         struct obd_export *exp;
1027         ENTRY;
1028
1029         /* It's possible that an export may disconnect itself, but
1030          * nothing else will be added to this list. */
1031         while (!list_empty(list)) {
1032                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1033                 /* need for safe call CDEBUG after obd_disconnect */
1034                 class_export_get(exp);
1035
1036                 spin_lock(&exp->exp_lock);
1037                 exp->exp_flags = flags;
1038                 spin_unlock(&exp->exp_lock);
1039
1040                 if (obd_uuid_equals(&exp->exp_client_uuid,
1041                                     &exp->exp_obd->obd_uuid)) {
1042                         CDEBUG(D_HA,
1043                                "exp %p export uuid == obd uuid, don't discon\n",
1044                                exp);
1045                         /* Need to delete this now so we don't end up pointing
1046                          * to work_list later when this export is cleaned up. */
1047                         list_del_init(&exp->exp_obd_chain);
1048                         class_export_put(exp);
1049                         continue;
1050                 }
1051
1052                 class_export_get(exp);
1053                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1054                        "last request at %ld\n",
1055                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1056                        exp, exp->exp_last_request_time);
1057
1058                 /* release one export reference anyway */
1059                 rc = obd_disconnect(exp);
1060                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1061                        obd_export_nid2str(exp), exp, rc);
1062                 class_export_put(exp);
1063         }
1064         EXIT;
1065 }
1066
1067 void class_disconnect_exports(struct obd_device *obd)
1068 {
1069         struct list_head work_list;
1070         ENTRY;
1071
1072         /* Move all of the exports from obd_exports to a work list, en masse. */
1073         CFS_INIT_LIST_HEAD(&work_list);
1074         spin_lock(&obd->obd_dev_lock);
1075         list_splice_init(&obd->obd_delayed_exports, &work_list);
1076         list_splice_init(&obd->obd_exports, &work_list);
1077         spin_unlock(&obd->obd_dev_lock);
1078
1079         CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1080                "disconnecting them\n", obd->obd_minor, obd);
1081         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd));
1082         EXIT;
1083 }
1084 EXPORT_SYMBOL(class_disconnect_exports);
1085
1086 /* Remove exports that have not completed recovery. */
1087 void class_disconnect_stale_exports(struct obd_device *obd,
1088                                     enum obd_option flags)
1089 {
1090         struct list_head work_list;
1091         struct list_head *pos, *n;
1092         struct obd_export *exp;
1093         ENTRY;
1094
1095         CFS_INIT_LIST_HEAD(&work_list);
1096         spin_lock(&obd->obd_dev_lock);
1097         list_for_each_safe(pos, n, &obd->obd_exports) {
1098                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1099                 if (exp->exp_replay_needed) {
1100                         list_move(&exp->exp_obd_chain, &work_list);
1101                         obd->obd_stale_clients++;
1102                 }
1103         }
1104         spin_unlock(&obd->obd_dev_lock);
1105
1106         CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1107                obd->obd_name, obd->obd_stale_clients);
1108         class_disconnect_export_list(&work_list, flags);
1109         EXIT;
1110 }
1111 EXPORT_SYMBOL(class_disconnect_stale_exports);
1112
1113 void class_disconnect_expired_exports(struct obd_device *obd)
1114 {
1115         struct list_head expired_list;
1116         struct obd_export *exp, *n;
1117         int cnt = 0;
1118         ENTRY;
1119
1120         CFS_INIT_LIST_HEAD(&expired_list);
1121         spin_lock(&obd->obd_dev_lock);
1122         list_for_each_entry_safe(exp, n, &obd->obd_delayed_exports,
1123                                  exp_obd_chain) {
1124                 if (exp_expired(exp, obd->u.obt.obt_stale_export_age)) {
1125                         list_move(&exp->exp_obd_chain, &expired_list);
1126                         cnt++;
1127                 }
1128         }
1129         spin_unlock(&obd->obd_dev_lock);
1130
1131         if (cnt == 0)
1132                 return;
1133
1134         CDEBUG(D_INFO, "%s: disconnecting %d expired exports\n",
1135                obd->obd_name, cnt);
1136         class_disconnect_export_list(&expired_list, exp_flags_from_obd(obd));
1137
1138         EXIT;
1139 }
1140 EXPORT_SYMBOL(class_disconnect_expired_exports);
1141
1142 void class_set_export_delayed(struct obd_export *exp)
1143 {
1144         struct obd_device *obd = class_exp2obd(exp);
1145
1146         LASSERT(!exp->exp_delayed);
1147
1148         /* no need to ping delayed exports */
1149         spin_lock(&obd->obd_dev_lock);
1150         list_del_init(&exp->exp_obd_chain_timed);
1151         list_move_tail(&exp->exp_obd_chain, &obd->obd_delayed_exports);
1152         spin_unlock(&obd->obd_dev_lock);
1153
1154         LASSERT(obd->obd_recoverable_clients > 0);
1155
1156         spin_lock_bh(&obd->obd_processing_task_lock);
1157         /* race with target_queue_last_replay_reply? */
1158         if (exp->exp_replay_needed) {
1159                 spin_lock(&exp->exp_lock);
1160                 exp->exp_delayed = 1;
1161                 spin_unlock(&exp->exp_lock);
1162
1163                 obd->obd_delayed_clients++;
1164                 obd->obd_recoverable_clients--;
1165         }
1166         spin_unlock_bh(&obd->obd_processing_task_lock);
1167
1168         CDEBUG(D_HA, "%s: set client %s as delayed\n",
1169                obd->obd_name, exp->exp_client_uuid.uuid);
1170 }
1171 EXPORT_SYMBOL(class_set_export_delayed);
1172
1173 /*
1174  * Manage exports that have not completed recovery.
1175  */
1176 void class_handle_stale_exports(struct obd_device *obd)
1177 {
1178         struct list_head delay_list, evict_list;
1179         struct obd_export *exp, *n;
1180         int delayed = 0;
1181         ENTRY;
1182
1183         CFS_INIT_LIST_HEAD(&delay_list);
1184         CFS_INIT_LIST_HEAD(&evict_list);
1185         spin_lock(&obd->obd_dev_lock);
1186         list_for_each_entry_safe(exp, n, &obd->obd_exports, exp_obd_chain) {
1187                 LASSERT(!exp->exp_delayed);
1188                 /* clients finished recovery */
1189                 if (!exp->exp_replay_needed)
1190                         continue;
1191                 /* connected non-vbr clients are evicted */
1192                 if (exp->exp_in_recovery && !exp_connect_vbr(exp)) {
1193                         obd->obd_stale_clients++;
1194                         list_move_tail(&exp->exp_obd_chain, &evict_list);
1195                         continue;
1196                 }
1197                 if (obd->obd_version_recov || !exp->exp_in_recovery) {
1198                         list_move_tail(&exp->exp_obd_chain, &delay_list);
1199                         delayed++;
1200                 }
1201         }
1202 #ifndef HAVE_DELAYED_RECOVERY
1203         /* delayed recovery is turned off, evict all delayed exports */
1204         list_splice_init(&delay_list, &evict_list);
1205         list_splice_init(&obd->obd_delayed_exports, &evict_list);
1206         obd->obd_stale_clients += delayed;
1207 #endif
1208         spin_unlock(&obd->obd_dev_lock);
1209
1210         list_for_each_entry_safe(exp, n, &delay_list, exp_obd_chain) {
1211                 class_set_export_delayed(exp);
1212                 exp->exp_last_request_time = cfs_time_current_sec();
1213         }
1214         LASSERT(list_empty(&delay_list));
1215
1216         /* evict clients without VBR support */
1217         class_disconnect_export_list(&evict_list, exp_flags_from_obd(obd));
1218
1219         EXIT;
1220 }
1221 EXPORT_SYMBOL(class_handle_stale_exports);
1222
1223 int oig_init(struct obd_io_group **oig_out)
1224 {
1225         struct obd_io_group *oig;
1226         ENTRY;
1227
1228         OBD_ALLOC(oig, sizeof(*oig));
1229         if (oig == NULL)
1230                 RETURN(-ENOMEM);
1231
1232         spin_lock_init(&oig->oig_lock);
1233         oig->oig_rc = 0;
1234         oig->oig_pending = 0;
1235         atomic_set(&oig->oig_refcount, 1);
1236         cfs_waitq_init(&oig->oig_waitq);
1237         CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1238
1239         *oig_out = oig;
1240         RETURN(0);
1241 };
1242 EXPORT_SYMBOL(oig_init);
1243
1244 static inline void oig_grab(struct obd_io_group *oig)
1245 {
1246         atomic_inc(&oig->oig_refcount);
1247 }
1248
1249 void oig_release(struct obd_io_group *oig)
1250 {
1251         if (atomic_dec_and_test(&oig->oig_refcount))
1252                 OBD_FREE(oig, sizeof(*oig));
1253 }
1254 EXPORT_SYMBOL(oig_release);
1255
1256 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1257 {
1258         int rc = 0;
1259         CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1260         spin_lock(&oig->oig_lock);
1261         if (oig->oig_rc) {
1262                 rc = oig->oig_rc;
1263         } else {
1264                 oig->oig_pending++;
1265                 if (occ != NULL)
1266                         list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1267         }
1268         spin_unlock(&oig->oig_lock);
1269         oig_grab(oig);
1270
1271         return rc;
1272 }
1273 EXPORT_SYMBOL(oig_add_one);
1274
1275 void oig_complete_one(struct obd_io_group *oig,
1276                       struct oig_callback_context *occ, int rc)
1277 {
1278         cfs_waitq_t *wake = NULL;
1279         int old_rc;
1280
1281         spin_lock(&oig->oig_lock);
1282
1283         if (occ != NULL)
1284                 list_del_init(&occ->occ_oig_item);
1285
1286         old_rc = oig->oig_rc;
1287         if (oig->oig_rc == 0 && rc != 0)
1288                 oig->oig_rc = rc;
1289
1290         if (--oig->oig_pending <= 0)
1291                 wake = &oig->oig_waitq;
1292
1293         spin_unlock(&oig->oig_lock);
1294
1295         CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1296                         "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1297                         oig->oig_pending);
1298         if (wake)
1299                 cfs_waitq_signal(wake);
1300         oig_release(oig);
1301 }
1302 EXPORT_SYMBOL(oig_complete_one);
1303
1304 static int oig_done(struct obd_io_group *oig)
1305 {
1306         int rc = 0;
1307         spin_lock(&oig->oig_lock);
1308         if (oig->oig_pending <= 0)
1309                 rc = 1;
1310         spin_unlock(&oig->oig_lock);
1311         return rc;
1312 }
1313
1314 static void interrupted_oig(void *data)
1315 {
1316         struct obd_io_group *oig = data;
1317         struct oig_callback_context *occ;
1318
1319         spin_lock(&oig->oig_lock);
1320         /* We need to restart the processing each time we drop the lock, as
1321          * it is possible other threads called oig_complete_one() to remove
1322          * an entry elsewhere in the list while we dropped lock.  We need to
1323          * drop the lock because osc_ap_completion() calls oig_complete_one()
1324          * which re-gets this lock ;-) as well as a lock ordering issue. */
1325 restart:
1326         list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1327                 if (occ->interrupted)
1328                         continue;
1329                 occ->interrupted = 1;
1330                 spin_unlock(&oig->oig_lock);
1331                 occ->occ_interrupted(occ);
1332                 spin_lock(&oig->oig_lock);
1333                 goto restart;
1334         }
1335         spin_unlock(&oig->oig_lock);
1336 }
1337
1338 int oig_wait(struct obd_io_group *oig)
1339 {
1340         struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1341         int rc;
1342
1343         CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1344
1345         do {
1346                 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1347                 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1348                 /* we can't continue until the oig has emptied and stopped
1349                  * referencing state that the caller will free upon return */
1350                 if (rc == -EINTR)
1351                         lwi = (struct l_wait_info){ 0, };
1352         } while (rc == -EINTR);
1353
1354         LASSERTF(oig->oig_pending == 0,
1355                  "exiting oig_wait(oig = %p) with %d pending\n", oig,
1356                  oig->oig_pending);
1357
1358         CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1359         return oig->oig_rc;
1360 }
1361 EXPORT_SYMBOL(oig_wait);
1362
1363 void class_fail_export(struct obd_export *exp)
1364 {
1365         int rc, already_failed;
1366
1367         spin_lock(&exp->exp_lock);
1368         already_failed = exp->exp_failed;
1369         exp->exp_failed = 1;
1370         spin_unlock(&exp->exp_lock);
1371
1372         if (already_failed) {
1373                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1374                        exp, exp->exp_client_uuid.uuid);
1375                 return;
1376         }
1377
1378         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1379                exp, exp->exp_client_uuid.uuid);
1380
1381         if (obd_dump_on_timeout)
1382                 libcfs_debug_dumplog();
1383
1384         /* Most callers into obd_disconnect are removing their own reference
1385          * (request, for example) in addition to the one from the hash table.
1386          * We don't have such a reference here, so make one. */
1387         class_export_get(exp);
1388         rc = obd_disconnect(exp);
1389         if (rc)
1390                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1391         else
1392                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1393                        exp, exp->exp_client_uuid.uuid);
1394 }
1395 EXPORT_SYMBOL(class_fail_export);
1396
1397 char *obd_export_nid2str(struct obd_export *exp)
1398 {
1399         if (exp->exp_connection != NULL)
1400                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1401
1402         return "(no nid)";
1403 }
1404 EXPORT_SYMBOL(obd_export_nid2str);
1405
1406 int obd_export_evict_by_nid(struct obd_device *obd, char *nid)
1407 {
1408         struct obd_export *doomed_exp = NULL;
1409         int exports_evicted = 0;
1410
1411         lnet_nid_t nid_key = libcfs_str2nid(nid);
1412
1413         do {
1414                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1415
1416                 if (doomed_exp == NULL)
1417                         break;
1418
1419                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1420                          "nid %s found, wanted nid %s, requested nid %s\n",
1421                          obd_export_nid2str(doomed_exp),
1422                          libcfs_nid2str(nid_key), nid);
1423
1424                 exports_evicted++;
1425                 CDEBUG(D_HA, "%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1426                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1427                        exports_evicted);
1428                 class_fail_export(doomed_exp);
1429                 class_export_put(doomed_exp);
1430         } while (1);
1431
1432         if (!exports_evicted)
1433                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1434                        obd->obd_name, nid);
1435         return exports_evicted;
1436 }
1437 EXPORT_SYMBOL(obd_export_evict_by_nid);
1438
1439 int obd_export_evict_by_uuid(struct obd_device *obd, char *uuid)
1440 {
1441         struct obd_export *doomed_exp = NULL;
1442         struct obd_uuid doomed_uuid;
1443         int exports_evicted = 0;
1444
1445         obd_str2uuid(&doomed_uuid, uuid);
1446         if(obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1447                 CERROR("%s: can't evict myself\n", obd->obd_name);
1448                 return exports_evicted;
1449         }
1450
1451         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1452
1453         if (doomed_exp == NULL) {
1454                 CERROR("%s: can't disconnect %s: no exports found\n",
1455                        obd->obd_name, uuid);
1456         } else {
1457                 CWARN("%s: evicting %s at adminstrative request\n",
1458                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1459                 class_fail_export(doomed_exp);
1460                 class_export_put(doomed_exp);
1461                 exports_evicted++;
1462         }
1463
1464         return exports_evicted;
1465 }
1466 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1467
1468 void obd_zombie_impexp_cull(void)
1469 {
1470         struct obd_import *import;
1471         struct obd_export *export;
1472
1473         do {
1474                 spin_lock (&obd_zombie_impexp_lock);
1475
1476                 import = NULL;
1477                 if (!list_empty(&obd_zombie_imports)) {
1478                         import = list_entry(obd_zombie_imports.next,
1479                                             struct obd_import,
1480                                             imp_zombie_chain);
1481                         list_del(&import->imp_zombie_chain);
1482                 }
1483
1484                 export = NULL;
1485                 if (!list_empty(&obd_zombie_exports)) {
1486                         export = list_entry(obd_zombie_exports.next,
1487                                             struct obd_export,
1488                                             exp_obd_chain);
1489                         list_del_init(&export->exp_obd_chain);
1490                 }
1491
1492                 spin_unlock(&obd_zombie_impexp_lock);
1493
1494                 if (import != NULL)
1495                         class_import_destroy(import);
1496
1497                 if (export != NULL)
1498                         class_export_destroy(export);
1499                 cfs_cond_resched();
1500         } while (import != NULL || export != NULL);
1501 }
1502
1503 static struct completion        obd_zombie_start;
1504 static struct completion        obd_zombie_stop;
1505 static unsigned long            obd_zombie_flags;
1506 static cfs_waitq_t              obd_zombie_waitq;
1507 static pid_t                    obd_zombie_pid;
1508
1509 enum {
1510         OBD_ZOMBIE_STOP = 1
1511 };
1512
1513 int obd_zombi_impexp_check(void *arg)
1514 {
1515         int rc;
1516
1517         spin_lock(&obd_zombie_impexp_lock);
1518         rc = list_empty(&obd_zombie_imports) &&
1519              list_empty(&obd_zombie_exports) &&
1520              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1521
1522         spin_unlock(&obd_zombie_impexp_lock);
1523
1524         RETURN(rc);
1525 }
1526
1527 static void obd_zombie_impexp_notify(void)
1528 {
1529         cfs_waitq_signal(&obd_zombie_waitq);
1530 }
1531
1532 /**
1533  * check whether obd_zombie is idle
1534  */
1535 static int obd_zombie_is_idle(void)
1536 {
1537         int rc;
1538
1539         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1540         spin_lock(&obd_zombie_impexp_lock);
1541         rc = list_empty(&obd_zombie_imports) &&
1542              list_empty(&obd_zombie_exports);
1543         spin_unlock(&obd_zombie_impexp_lock);
1544         return rc;
1545 }
1546
1547 /**
1548  * wait when obd_zombie import/export queues become empty
1549  */
1550 void obd_zombie_barrier(void)
1551 {
1552         struct l_wait_info lwi = { 0 };
1553
1554         if (obd_zombie_pid == cfs_curproc_pid())
1555                 /* don't wait for myself */
1556                 return;
1557         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1558 }
1559 EXPORT_SYMBOL(obd_zombie_barrier);
1560
1561 #ifdef __KERNEL__
1562
1563 static int obd_zombie_impexp_thread(void *unused)
1564 {
1565         int rc;
1566
1567         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1568                 complete(&obd_zombie_start);
1569                 RETURN(rc);
1570         }
1571
1572         complete(&obd_zombie_start);
1573
1574         obd_zombie_pid = cfs_curproc_pid();
1575
1576         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1577                 struct l_wait_info lwi = { 0 };
1578
1579                 l_wait_event(obd_zombie_waitq, !obd_zombi_impexp_check(NULL), &lwi);
1580
1581                 obd_zombie_impexp_cull();
1582
1583                 /*
1584                  * Notify obd_zombie_barrier callers that queues
1585                  * may be empty.
1586                  */
1587                 cfs_waitq_signal(&obd_zombie_waitq);
1588         }
1589
1590         complete(&obd_zombie_stop);
1591
1592         RETURN(0);
1593 }
1594
1595 #else /* ! KERNEL */
1596
1597 static atomic_t zombi_recur = ATOMIC_INIT(0);
1598 static void *obd_zombi_impexp_work_cb;
1599 static void *obd_zombi_impexp_idle_cb;
1600
1601 int obd_zombi_impexp_kill(void *arg)
1602 {
1603         int rc = 0;
1604
1605         if (atomic_inc_return(&zombi_recur) == 1) {
1606                 obd_zombie_impexp_cull();
1607                 rc = 1;
1608         }
1609         atomic_dec(&zombi_recur);
1610         return rc;
1611 }
1612
1613 #endif
1614
1615 int obd_zombie_impexp_init(void)
1616 {
1617         int rc;
1618
1619         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1620         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1621         spin_lock_init(&obd_zombie_impexp_lock);
1622         init_completion(&obd_zombie_start);
1623         init_completion(&obd_zombie_stop);
1624         cfs_waitq_init(&obd_zombie_waitq);
1625         obd_zombie_pid = 0;
1626
1627 #ifdef __KERNEL__
1628         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1629         if (rc < 0)
1630                 RETURN(rc);
1631
1632         wait_for_completion(&obd_zombie_start);
1633 #else
1634
1635         obd_zombi_impexp_work_cb =
1636                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1637                                                  &obd_zombi_impexp_kill, NULL);
1638
1639         obd_zombi_impexp_idle_cb =
1640                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1641                                                  &obd_zombi_impexp_check, NULL);
1642         rc = 0;
1643
1644 #endif
1645         RETURN(rc);
1646 }
1647
1648 void obd_zombie_impexp_stop(void)
1649 {
1650         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1651         obd_zombie_impexp_notify();
1652 #ifdef __KERNEL__
1653         wait_for_completion(&obd_zombie_stop);
1654 #else
1655         liblustre_deregister_wait_callback(obd_zombi_impexp_work_cb);
1656         liblustre_deregister_idle_callback(obd_zombi_impexp_idle_cb);
1657 #endif
1658 }