Whamcloud - gitweb
Land b_head_quota onto HEAD (20081116_0105)
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65
66 /*
67  * support functions: we could use inter-module communication, but this
68  * is more portable to other OS's
69  */
70 static struct obd_device *obd_device_alloc(void)
71 {
72         struct obd_device *obd;
73
74         OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
75         if (obd != NULL) {
76                 obd->obd_magic = OBD_DEVICE_MAGIC;
77         }
78         return obd;
79 }
80 EXPORT_SYMBOL(obd_device_alloc);
81
82 static void obd_device_free(struct obd_device *obd)
83 {
84         LASSERT(obd != NULL);
85         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87         if (obd->obd_namespace != NULL) {
88                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89                        obd, obd->obd_namespace, obd->obd_force);
90                 LBUG();
91         }
92         lu_ref_fini(&obd->obd_reference);
93         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 }
95
96 struct obd_type *class_search_type(const char *name)
97 {
98         struct list_head *tmp;
99         struct obd_type *type;
100
101         spin_lock(&obd_types_lock);
102         list_for_each(tmp, &obd_types) {
103                 type = list_entry(tmp, struct obd_type, typ_chain);
104                 if (strcmp(type->typ_name, name) == 0) {
105                         spin_unlock(&obd_types_lock);
106                         return type;
107                 }
108         }
109         spin_unlock(&obd_types_lock);
110         return NULL;
111 }
112
113 struct obd_type *class_get_type(const char *name)
114 {
115         struct obd_type *type = class_search_type(name);
116
117 #ifdef CONFIG_KMOD
118         if (!type) {
119                 const char *modname = name;
120                 if (!request_module(modname)) {
121                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122                         type = class_search_type(name);
123                 } else {
124                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
125                                            modname);
126                 }
127         }
128 #endif
129         if (type) {
130                 spin_lock(&type->obd_type_lock);
131                 type->typ_refcnt++;
132                 try_module_get(type->typ_dt_ops->o_owner);
133                 spin_unlock(&type->obd_type_lock);
134         }
135         return type;
136 }
137
138 void class_put_type(struct obd_type *type)
139 {
140         LASSERT(type);
141         spin_lock(&type->obd_type_lock);
142         type->typ_refcnt--;
143         module_put(type->typ_dt_ops->o_owner);
144         spin_unlock(&type->obd_type_lock);
145 }
146
147 #define CLASS_MAX_NAME 1024
148
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150                         struct lprocfs_vars *vars, const char *name,
151                         struct lu_device_type *ldt)
152 {
153         struct obd_type *type;
154         int rc = 0;
155         ENTRY;
156
157         /* sanity check */
158         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
159
160         if (class_search_type(name)) {
161                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
162                 RETURN(-EEXIST);
163         }
164
165         rc = -ENOMEM;
166         OBD_ALLOC(type, sizeof(*type));
167         if (type == NULL)
168                 RETURN(rc);
169
170         OBD_ALLOC_PTR(type->typ_dt_ops);
171         OBD_ALLOC_PTR(type->typ_md_ops);
172         OBD_ALLOC(type->typ_name, strlen(name) + 1);
173
174         if (type->typ_dt_ops == NULL ||
175             type->typ_md_ops == NULL ||
176             type->typ_name == NULL)
177                 GOTO (failed, rc);
178
179         *(type->typ_dt_ops) = *dt_ops;
180         /* md_ops is optional */
181         if (md_ops)
182                 *(type->typ_md_ops) = *md_ops;
183         strcpy(type->typ_name, name);
184         spin_lock_init(&type->obd_type_lock);
185
186 #ifdef LPROCFS
187         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
188                                               vars, type);
189         if (IS_ERR(type->typ_procroot)) {
190                 rc = PTR_ERR(type->typ_procroot);
191                 type->typ_procroot = NULL;
192                 GOTO (failed, rc);
193         }
194 #endif
195         if (ldt != NULL) {
196                 type->typ_lu = ldt;
197                 rc = lu_device_type_init(ldt);
198                 if (rc != 0)
199                         GOTO (failed, rc);
200         }
201
202         spin_lock(&obd_types_lock);
203         list_add(&type->typ_chain, &obd_types);
204         spin_unlock(&obd_types_lock);
205
206         RETURN (0);
207
208  failed:
209         if (type->typ_name != NULL)
210                 OBD_FREE(type->typ_name, strlen(name) + 1);
211         if (type->typ_md_ops != NULL)
212                 OBD_FREE_PTR(type->typ_md_ops);
213         if (type->typ_dt_ops != NULL)
214                 OBD_FREE_PTR(type->typ_dt_ops);
215         OBD_FREE(type, sizeof(*type));
216         RETURN(rc);
217 }
218
219 int class_unregister_type(const char *name)
220 {
221         struct obd_type *type = class_search_type(name);
222         ENTRY;
223
224         if (!type) {
225                 CERROR("unknown obd type\n");
226                 RETURN(-EINVAL);
227         }
228
229         if (type->typ_refcnt) {
230                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231                 /* This is a bad situation, let's make the best of it */
232                 /* Remove ops, but leave the name for debugging */
233                 OBD_FREE_PTR(type->typ_dt_ops);
234                 OBD_FREE_PTR(type->typ_md_ops);
235                 RETURN(-EBUSY);
236         }
237
238         if (type->typ_procroot) {
239                 lprocfs_remove(&type->typ_procroot);
240         }
241
242         if (type->typ_lu)
243                 lu_device_type_fini(type->typ_lu);
244
245         spin_lock(&obd_types_lock);
246         list_del(&type->typ_chain);
247         spin_unlock(&obd_types_lock);
248         OBD_FREE(type->typ_name, strlen(name) + 1);
249         if (type->typ_dt_ops != NULL)
250                 OBD_FREE_PTR(type->typ_dt_ops);
251         if (type->typ_md_ops != NULL)
252                 OBD_FREE_PTR(type->typ_md_ops);
253         OBD_FREE(type, sizeof(*type));
254         RETURN(0);
255 } /* class_unregister_type */
256
257 /**
258  * Create a new obd device.
259  *
260  * Find an empty slot in ::obd_devs[], create a new obd device in it.
261  *
262  * \param typename [in] obd device type string.
263  * \param name     [in] obd device name.
264  *
265  * \retval NULL if create fails, otherwise return the obd device
266  *         pointer created.
267  */
268 struct obd_device *class_newdev(const char *type_name, const char *name)
269 {
270         struct obd_device *result = NULL;
271         struct obd_device *newdev;
272         struct obd_type *type = NULL;
273         int i;
274         int new_obd_minor = 0;
275
276         if (strlen(name) >= MAX_OBD_NAME) {
277                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278                 RETURN(ERR_PTR(-EINVAL));
279         }
280
281         type = class_get_type(type_name);
282         if (type == NULL){
283                 CERROR("OBD: unknown type: %s\n", type_name);
284                 RETURN(ERR_PTR(-ENODEV));
285         }
286
287         newdev = obd_device_alloc();
288         if (newdev == NULL) {
289                 class_put_type(type);
290                 RETURN(ERR_PTR(-ENOMEM));
291         }
292         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
293
294         spin_lock(&obd_dev_lock);
295         for (i = 0; i < class_devno_max(); i++) {
296                 struct obd_device *obd = class_num2obd(i);
297                 if (obd && obd->obd_name &&
298                     (strcmp(name, obd->obd_name) == 0)) {
299                         CERROR("Device %s already exists, won't add\n", name);
300                         if (result) {
301                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302                                          "%p obd_magic %08x != %08x\n", result,
303                                          result->obd_magic, OBD_DEVICE_MAGIC);
304                                 LASSERTF(result->obd_minor == new_obd_minor,
305                                          "%p obd_minor %d != %d\n", result,
306                                          result->obd_minor, new_obd_minor);
307
308                                 obd_devs[result->obd_minor] = NULL;
309                                 result->obd_name[0]='\0';
310                          }
311                         result = ERR_PTR(-EEXIST);
312                         break;
313                 }
314                 if (!result && !obd) {
315                         result = newdev;
316                         result->obd_minor = i;
317                         new_obd_minor = i;
318                         result->obd_type = type;
319                         strncpy(result->obd_name, name,
320                                 sizeof(result->obd_name) - 1);
321                         obd_devs[i] = result;
322                 }
323         }
324         spin_unlock(&obd_dev_lock);
325
326         if (result == NULL && i >= class_devno_max()) {
327                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
328                        class_devno_max());
329                 result = ERR_PTR(-EOVERFLOW);
330         }
331
332         if (IS_ERR(result)) {
333                 obd_device_free(newdev);
334                 class_put_type(type);
335         } else {
336                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337                        result->obd_name, result);
338         }
339         return result;
340 }
341
342 void class_release_dev(struct obd_device *obd)
343 {
344         struct obd_type *obd_type = obd->obd_type;
345
346         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350         LASSERT(obd_type != NULL);
351
352         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353                obd->obd_name,obd->obd_type->typ_name);
354
355         spin_lock(&obd_dev_lock);
356         obd_devs[obd->obd_minor] = NULL;
357         spin_unlock(&obd_dev_lock);
358         obd_device_free(obd);
359
360         class_put_type(obd_type);
361 }
362
363 int class_name2dev(const char *name)
364 {
365         int i;
366
367         if (!name)
368                 return -1;
369
370         spin_lock(&obd_dev_lock);
371         for (i = 0; i < class_devno_max(); i++) {
372                 struct obd_device *obd = class_num2obd(i);
373                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374                         /* Make sure we finished attaching before we give
375                            out any references */
376                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377                         if (obd->obd_attached) {
378                                 spin_unlock(&obd_dev_lock);
379                                 return i;
380                         }
381                         break;
382                 }
383         }
384         spin_unlock(&obd_dev_lock);
385
386         return -1;
387 }
388
389 struct obd_device *class_name2obd(const char *name)
390 {
391         int dev = class_name2dev(name);
392
393         if (dev < 0 || dev > class_devno_max())
394                 return NULL;
395         return class_num2obd(dev);
396 }
397
398 int class_uuid2dev(struct obd_uuid *uuid)
399 {
400         int i;
401
402         spin_lock(&obd_dev_lock);
403         for (i = 0; i < class_devno_max(); i++) {
404                 struct obd_device *obd = class_num2obd(i);
405                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407                         spin_unlock(&obd_dev_lock);
408                         return i;
409                 }
410         }
411         spin_unlock(&obd_dev_lock);
412
413         return -1;
414 }
415
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
417 {
418         int dev = class_uuid2dev(uuid);
419         if (dev < 0)
420                 return NULL;
421         return class_num2obd(dev);
422 }
423
424 /**
425  * Get obd device from ::obd_devs[]
426  *
427  * \param num [in] array index
428  *
429  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430  *         otherwise return the obd device there.
431  */
432 struct obd_device *class_num2obd(int num)
433 {
434         struct obd_device *obd = NULL;
435
436         if (num < class_devno_max()) {
437                 obd = obd_devs[num];
438                 if (obd == NULL)
439                         return NULL;
440
441                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442                          "%p obd_magic %08x != %08x\n",
443                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444                 LASSERTF(obd->obd_minor == num,
445                          "%p obd_minor %0d != %0d\n",
446                          obd, obd->obd_minor, num);
447         }
448
449         return obd;
450 }
451
452 void class_obd_list(void)
453 {
454         char *status;
455         int i;
456
457         spin_lock(&obd_dev_lock);
458         for (i = 0; i < class_devno_max(); i++) {
459                 struct obd_device *obd = class_num2obd(i);
460                 if (obd == NULL)
461                         continue;
462                 if (obd->obd_stopping)
463                         status = "ST";
464                 else if (obd->obd_set_up)
465                         status = "UP";
466                 else if (obd->obd_attached)
467                         status = "AT";
468                 else
469                         status = "--";
470                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471                          i, status, obd->obd_type->typ_name,
472                          obd->obd_name, obd->obd_uuid.uuid,
473                          atomic_read(&obd->obd_refcount));
474         }
475         spin_unlock(&obd_dev_lock);
476         return;
477 }
478
479 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
480    specified, then only the client with that uuid is returned,
481    otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483                                           const char * typ_name,
484                                           struct obd_uuid *grp_uuid)
485 {
486         int i;
487
488         spin_lock(&obd_dev_lock);
489         for (i = 0; i < class_devno_max(); i++) {
490                 struct obd_device *obd = class_num2obd(i);
491                 if (obd == NULL)
492                         continue;
493                 if ((strncmp(obd->obd_type->typ_name, typ_name,
494                              strlen(typ_name)) == 0)) {
495                         if (obd_uuid_equals(tgt_uuid,
496                                             &obd->u.cli.cl_target_uuid) &&
497                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
498                                                          &obd->obd_uuid) : 1)) {
499                                 spin_unlock(&obd_dev_lock);
500                                 return obd;
501                         }
502                 }
503         }
504         spin_unlock(&obd_dev_lock);
505
506         return NULL;
507 }
508
509 /* Iterate the obd_device list looking devices have grp_uuid. Start
510    searching at *next, and if a device is found, the next index to look
511    at is saved in *next. If next is NULL, then the first matching device
512    will always be returned. */
513 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
514 {
515         int i;
516
517         if (next == NULL)
518                 i = 0;
519         else if (*next >= 0 && *next < class_devno_max())
520                 i = *next;
521         else
522                 return NULL;
523
524         spin_lock(&obd_dev_lock);
525         for (; i < class_devno_max(); i++) {
526                 struct obd_device *obd = class_num2obd(i);
527                 if (obd == NULL)
528                         continue;
529                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
530                         if (next != NULL)
531                                 *next = i+1;
532                         spin_unlock(&obd_dev_lock);
533                         return obd;
534                 }
535         }
536         spin_unlock(&obd_dev_lock);
537
538         return NULL;
539 }
540
541
542 void obd_cleanup_caches(void)
543 {
544         int rc;
545
546         ENTRY;
547         if (obd_device_cachep) {
548                 rc = cfs_mem_cache_destroy(obd_device_cachep);
549                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
550                 obd_device_cachep = NULL;
551         }
552         if (obdo_cachep) {
553                 rc = cfs_mem_cache_destroy(obdo_cachep);
554                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
555                 obdo_cachep = NULL;
556         }
557         if (import_cachep) {
558                 rc = cfs_mem_cache_destroy(import_cachep);
559                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
560                 import_cachep = NULL;
561         }
562         if (capa_cachep) {
563                 rc = cfs_mem_cache_destroy(capa_cachep);
564                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
565                 capa_cachep = NULL;
566         }
567         EXIT;
568 }
569
570 int obd_init_caches(void)
571 {
572         ENTRY;
573
574         LASSERT(obd_device_cachep == NULL);
575         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
576                                                  sizeof(struct obd_device),
577                                                  0, 0);
578         if (!obd_device_cachep)
579                 GOTO(out, -ENOMEM);
580
581         LASSERT(obdo_cachep == NULL);
582         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
583                                            0, 0);
584         if (!obdo_cachep)
585                 GOTO(out, -ENOMEM);
586
587         LASSERT(import_cachep == NULL);
588         import_cachep = cfs_mem_cache_create("ll_import_cache",
589                                              sizeof(struct obd_import),
590                                              0, 0);
591         if (!import_cachep)
592                 GOTO(out, -ENOMEM);
593
594         LASSERT(capa_cachep == NULL);
595         capa_cachep = cfs_mem_cache_create("capa_cache",
596                                            sizeof(struct obd_capa), 0, 0);
597         if (!capa_cachep)
598                 GOTO(out, -ENOMEM);
599
600         RETURN(0);
601  out:
602         obd_cleanup_caches();
603         RETURN(-ENOMEM);
604
605 }
606
607 /* map connection to client */
608 struct obd_export *class_conn2export(struct lustre_handle *conn)
609 {
610         struct obd_export *export;
611         ENTRY;
612
613         if (!conn) {
614                 CDEBUG(D_CACHE, "looking for null handle\n");
615                 RETURN(NULL);
616         }
617
618         if (conn->cookie == -1) {  /* this means assign a new connection */
619                 CDEBUG(D_CACHE, "want a new connection\n");
620                 RETURN(NULL);
621         }
622
623         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
624         export = class_handle2object(conn->cookie);
625         RETURN(export);
626 }
627
628 struct obd_device *class_exp2obd(struct obd_export *exp)
629 {
630         if (exp)
631                 return exp->exp_obd;
632         return NULL;
633 }
634
635 struct obd_device *class_conn2obd(struct lustre_handle *conn)
636 {
637         struct obd_export *export;
638         export = class_conn2export(conn);
639         if (export) {
640                 struct obd_device *obd = export->exp_obd;
641                 class_export_put(export);
642                 return obd;
643         }
644         return NULL;
645 }
646
647 struct obd_import *class_exp2cliimp(struct obd_export *exp)
648 {
649         struct obd_device *obd = exp->exp_obd;
650         if (obd == NULL)
651                 return NULL;
652         return obd->u.cli.cl_import;
653 }
654
655 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
656 {
657         struct obd_device *obd = class_conn2obd(conn);
658         if (obd == NULL)
659                 return NULL;
660         return obd->u.cli.cl_import;
661 }
662
663 /* Export management functions */
664 static void export_handle_addref(void *export)
665 {
666         class_export_get(export);
667 }
668
669 struct obd_export *class_export_get(struct obd_export *exp)
670 {
671         atomic_inc(&exp->exp_refcount);
672         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
673                atomic_read(&exp->exp_refcount));
674         return exp;
675 }
676 EXPORT_SYMBOL(class_export_get);
677
678 void class_export_put(struct obd_export *exp)
679 {
680         LASSERT(exp != NULL);
681         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
682                atomic_read(&exp->exp_refcount) - 1);
683         LASSERT(atomic_read(&exp->exp_refcount) > 0);
684         LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
685
686         if (atomic_dec_and_test(&exp->exp_refcount)) {
687                 LASSERT (list_empty(&exp->exp_obd_chain));
688
689                 CDEBUG(D_IOCTL, "final put %p/%s\n",
690                        exp, exp->exp_client_uuid.uuid);
691
692                 spin_lock(&obd_zombie_impexp_lock);
693                 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
694                 spin_unlock(&obd_zombie_impexp_lock);
695
696                 if (obd_zombie_impexp_notify != NULL)
697                         obd_zombie_impexp_notify();
698         }
699 }
700 EXPORT_SYMBOL(class_export_put);
701
702 static void class_export_destroy(struct obd_export *exp)
703 {
704         struct obd_device *obd = exp->exp_obd;
705         ENTRY;
706
707         LASSERT (atomic_read(&exp->exp_refcount) == 0);
708
709         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
710                exp->exp_client_uuid.uuid, obd->obd_name);
711
712         LASSERT(obd != NULL);
713
714         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
715         if (exp->exp_connection)
716                 ptlrpc_put_connection_superhack(exp->exp_connection);
717
718         LASSERT(list_empty(&exp->exp_outstanding_replies));
719         LASSERT(list_empty(&exp->exp_req_replay_queue));
720         obd_destroy_export(exp);
721         class_decref(obd, "export", exp);
722
723         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
724         EXIT;
725 }
726
727 /* Creates a new export, adds it to the hash table, and returns a
728  * pointer to it. The refcount is 2: one for the hash reference, and
729  * one for the pointer returned by this function. */
730 struct obd_export *class_new_export(struct obd_device *obd,
731                                     struct obd_uuid *cluuid)
732 {
733         struct obd_export *export;
734         int rc = 0;
735
736         OBD_ALLOC_PTR(export);
737         if (!export)
738                 return ERR_PTR(-ENOMEM);
739
740         export->exp_conn_cnt = 0;
741         export->exp_lock_hash = NULL;
742         atomic_set(&export->exp_refcount, 2);
743         atomic_set(&export->exp_rpc_count, 0);
744         export->exp_obd = obd;
745         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
746         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
747         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
748         class_handle_hash(&export->exp_handle, export_handle_addref);
749         export->exp_last_request_time = cfs_time_current_sec();
750         spin_lock_init(&export->exp_lock);
751         INIT_HLIST_NODE(&export->exp_uuid_hash);
752         INIT_HLIST_NODE(&export->exp_nid_hash);
753
754         export->exp_sp_peer = LUSTRE_SP_ANY;
755         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
756         export->exp_client_uuid = *cluuid;
757         obd_init_export(export);
758
759         spin_lock(&obd->obd_dev_lock);
760         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
761                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
762                                             &export->exp_uuid_hash);
763                 if (rc != 0) {
764                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
765                                       obd->obd_name, cluuid->uuid, rc);
766                         spin_unlock(&obd->obd_dev_lock);
767                         class_handle_unhash(&export->exp_handle);
768                         OBD_FREE_PTR(export);
769                         return ERR_PTR(-EALREADY);
770                 }
771         }
772
773         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
774         class_incref(obd, "export", export);
775         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
776         list_add_tail(&export->exp_obd_chain_timed,
777                       &export->exp_obd->obd_exports_timed);
778         export->exp_obd->obd_num_exports++;
779         spin_unlock(&obd->obd_dev_lock);
780
781         return export;
782 }
783 EXPORT_SYMBOL(class_new_export);
784
785 void class_unlink_export(struct obd_export *exp)
786 {
787         class_handle_unhash(&exp->exp_handle);
788
789         spin_lock(&exp->exp_obd->obd_dev_lock);
790         /* delete an uuid-export hashitem from hashtables */
791         if (!hlist_unhashed(&exp->exp_uuid_hash))
792                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
793                                 &exp->exp_client_uuid,
794                                 &exp->exp_uuid_hash);
795
796         list_del_init(&exp->exp_obd_chain);
797         list_del_init(&exp->exp_obd_chain_timed);
798         exp->exp_obd->obd_num_exports--;
799         spin_unlock(&exp->exp_obd->obd_dev_lock);
800
801         class_export_put(exp);
802 }
803 EXPORT_SYMBOL(class_unlink_export);
804
805 /* Import management functions */
806 static void import_handle_addref(void *import)
807 {
808         class_import_get(import);
809 }
810
811 struct obd_import *class_import_get(struct obd_import *import)
812 {
813         LASSERT(atomic_read(&import->imp_refcount) >= 0);
814         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
815         atomic_inc(&import->imp_refcount);
816         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
817                atomic_read(&import->imp_refcount));
818         return import;
819 }
820 EXPORT_SYMBOL(class_import_get);
821
822 void class_import_put(struct obd_import *import)
823 {
824         ENTRY;
825
826         LASSERT(atomic_read(&import->imp_refcount) > 0);
827         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
828         LASSERT(list_empty(&import->imp_zombie_chain));
829
830         CDEBUG(D_INFO, "import %p refcount=%d\n", import,
831                atomic_read(&import->imp_refcount) - 1);
832
833         if (atomic_dec_and_test(&import->imp_refcount)) {
834
835                 CDEBUG(D_INFO, "final put import %p\n", import);
836
837                 spin_lock(&obd_zombie_impexp_lock);
838                 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
839                 spin_unlock(&obd_zombie_impexp_lock);
840
841                 if (obd_zombie_impexp_notify != NULL)
842                         obd_zombie_impexp_notify();
843         }
844
845         EXIT;
846 }
847 EXPORT_SYMBOL(class_import_put);
848
849 void class_import_destroy(struct obd_import *import)
850 {
851         ENTRY;
852
853         CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
854                 import->imp_obd->obd_name);
855
856         LASSERT(atomic_read(&import->imp_refcount) == 0);
857
858         ptlrpc_put_connection_superhack(import->imp_connection);
859
860         while (!list_empty(&import->imp_conn_list)) {
861                 struct obd_import_conn *imp_conn;
862
863                 imp_conn = list_entry(import->imp_conn_list.next,
864                                       struct obd_import_conn, oic_item);
865                 list_del(&imp_conn->oic_item);
866                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
867                 OBD_FREE(imp_conn, sizeof(*imp_conn));
868         }
869
870         LASSERT(import->imp_sec == NULL);
871         class_decref(import->imp_obd, "import", import);
872         OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
873         EXIT;
874 }
875
876 static void init_imp_at(struct imp_at *at) {
877         int i;
878         at_init(&at->iat_net_latency, 0, 0);
879         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
880                 /* max service estimates are tracked on the server side, so
881                    don't use the AT history here, just use the last reported
882                    val. (But keep hist for proc histogram, worst_ever) */
883                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
884                         AT_FLG_NOHIST);
885         }
886 }
887
888 struct obd_import *class_new_import(struct obd_device *obd)
889 {
890         struct obd_import *imp;
891
892         OBD_ALLOC(imp, sizeof(*imp));
893         if (imp == NULL)
894                 return NULL;
895
896         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
897         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
898         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
899         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
900         spin_lock_init(&imp->imp_lock);
901         imp->imp_last_success_conn = 0;
902         imp->imp_state = LUSTRE_IMP_NEW;
903         imp->imp_obd = class_incref(obd, "import", imp);
904         sema_init(&imp->imp_sec_mutex, 1);
905         cfs_waitq_init(&imp->imp_recovery_waitq);
906
907         atomic_set(&imp->imp_refcount, 2);
908         atomic_set(&imp->imp_unregistering, 0);
909         atomic_set(&imp->imp_inflight, 0);
910         atomic_set(&imp->imp_replay_inflight, 0);
911         atomic_set(&imp->imp_inval_count, 0);
912         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
913         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
914         class_handle_hash(&imp->imp_handle, import_handle_addref);
915         init_imp_at(&imp->imp_at);
916
917         /* the default magic is V2, will be used in connect RPC, and
918          * then adjusted according to the flags in request/reply. */
919         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
920
921         return imp;
922 }
923 EXPORT_SYMBOL(class_new_import);
924
925 void class_destroy_import(struct obd_import *import)
926 {
927         LASSERT(import != NULL);
928         LASSERT(import != LP_POISON);
929
930         class_handle_unhash(&import->imp_handle);
931
932         spin_lock(&import->imp_lock);
933         import->imp_generation++;
934         spin_unlock(&import->imp_lock);
935         class_import_put(import);
936 }
937 EXPORT_SYMBOL(class_destroy_import);
938
939 /* A connection defines an export context in which preallocation can
940    be managed. This releases the export pointer reference, and returns
941    the export handle, so the export refcount is 1 when this function
942    returns. */
943 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
944                   struct obd_uuid *cluuid)
945 {
946         struct obd_export *export;
947         LASSERT(conn != NULL);
948         LASSERT(obd != NULL);
949         LASSERT(cluuid != NULL);
950         ENTRY;
951
952         export = class_new_export(obd, cluuid);
953         if (IS_ERR(export))
954                 RETURN(PTR_ERR(export));
955
956         conn->cookie = export->exp_handle.h_cookie;
957         class_export_put(export);
958
959         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
960                cluuid->uuid, conn->cookie);
961         RETURN(0);
962 }
963 EXPORT_SYMBOL(class_connect);
964
965 /* if export is involved in recovery then clean up related things */
966 void class_export_recovery_cleanup(struct obd_export *exp)
967 {
968         struct obd_device *obd = exp->exp_obd;
969
970         spin_lock_bh(&obd->obd_processing_task_lock);
971         if (obd->obd_recovering && exp->exp_in_recovery) {
972                 spin_lock(&exp->exp_lock);
973                 exp->exp_in_recovery = 0;
974                 spin_unlock(&exp->exp_lock);
975                 obd->obd_connected_clients--;
976                 /* each connected client is counted as recoverable */
977                 obd->obd_recoverable_clients--;
978                 if (exp->exp_req_replay_needed) {
979                         spin_lock(&exp->exp_lock);
980                         exp->exp_req_replay_needed = 0;
981                         spin_unlock(&exp->exp_lock);
982                         LASSERT(atomic_read(&obd->obd_req_replay_clients));
983                         atomic_dec(&obd->obd_req_replay_clients);
984                 }
985                 if (exp->exp_lock_replay_needed) {
986                         spin_lock(&exp->exp_lock);
987                         exp->exp_lock_replay_needed = 0;
988                         spin_unlock(&exp->exp_lock);
989                         LASSERT(atomic_read(&obd->obd_lock_replay_clients));
990                         atomic_dec(&obd->obd_lock_replay_clients);
991                 }
992         }
993         spin_unlock_bh(&obd->obd_processing_task_lock);
994 }
995
996 /* This function removes two references from the export: one for the
997  * hash entry and one for the export pointer passed in.  The export
998  * pointer passed to this function is destroyed should not be used
999  * again. */
1000 int class_disconnect(struct obd_export *export)
1001 {
1002         int already_disconnected;
1003         ENTRY;
1004
1005         if (export == NULL) {
1006                 fixme();
1007                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1008                 RETURN(-EINVAL);
1009         }
1010
1011         spin_lock(&export->exp_lock);
1012         already_disconnected = export->exp_disconnected;
1013         export->exp_disconnected = 1;
1014
1015         if (!hlist_unhashed(&export->exp_nid_hash))
1016                 lustre_hash_del(export->exp_obd->obd_nid_hash,
1017                                 &export->exp_connection->c_peer.nid,
1018                                 &export->exp_nid_hash);
1019
1020         spin_unlock(&export->exp_lock);
1021
1022         /* class_cleanup(), abort_recovery(), and class_fail_export()
1023          * all end up in here, and if any of them race we shouldn't
1024          * call extra class_export_puts(). */
1025         if (already_disconnected)
1026                 RETURN(0);
1027
1028         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1029                export->exp_handle.h_cookie);
1030
1031         class_export_recovery_cleanup(export);
1032         class_unlink_export(export);
1033         class_export_put(export);
1034         RETURN(0);
1035 }
1036
1037 static void class_disconnect_export_list(struct list_head *list, int flags)
1038 {
1039         int rc;
1040         struct lustre_handle fake_conn;
1041         struct obd_export *fake_exp, *exp;
1042         ENTRY;
1043
1044         /* It's possible that an export may disconnect itself, but
1045          * nothing else will be added to this list. */
1046         while (!list_empty(list)) {
1047                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1048                 class_export_get(exp);
1049
1050                 spin_lock(&exp->exp_lock);
1051                 exp->exp_flags = flags;
1052                 spin_unlock(&exp->exp_lock);
1053
1054                 if (obd_uuid_equals(&exp->exp_client_uuid,
1055                                     &exp->exp_obd->obd_uuid)) {
1056                         CDEBUG(D_HA,
1057                                "exp %p export uuid == obd uuid, don't discon\n",
1058                                exp);
1059                         /* Need to delete this now so we don't end up pointing
1060                          * to work_list later when this export is cleaned up. */
1061                         list_del_init(&exp->exp_obd_chain);
1062                         class_export_put(exp);
1063                         continue;
1064                 }
1065
1066                 fake_conn.cookie = exp->exp_handle.h_cookie;
1067                 fake_exp = class_conn2export(&fake_conn);
1068                 if (!fake_exp) {
1069                         class_export_put(exp);
1070                         continue;
1071                 }
1072
1073                 spin_lock(&fake_exp->exp_lock);
1074                 fake_exp->exp_flags = flags;
1075                 spin_unlock(&fake_exp->exp_lock);
1076
1077                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1078                        "last request at "CFS_TIME_T"\n",
1079                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1080                        exp, exp->exp_last_request_time);
1081                 rc = obd_disconnect(fake_exp);
1082                 class_export_put(exp);
1083         }
1084         EXIT;
1085 }
1086
1087 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1088 {
1089         return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1090                 (obd->obd_force ? OBD_OPT_FORCE : 0));
1091 }
1092
1093 void class_disconnect_exports(struct obd_device *obd)
1094 {
1095         struct list_head work_list;
1096         ENTRY;
1097
1098         /* Move all of the exports from obd_exports to a work list, en masse. */
1099         spin_lock(&obd->obd_dev_lock);
1100         list_add(&work_list, &obd->obd_exports);
1101         list_del_init(&obd->obd_exports);
1102         spin_unlock(&obd->obd_dev_lock);
1103
1104         if (!list_empty(&work_list)) {
1105                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1106                        "disconnecting them\n", obd->obd_minor, obd);
1107                 class_disconnect_export_list(&work_list,
1108                                              get_exp_flags_from_obd(obd));
1109         } else
1110                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1111                        obd->obd_minor, obd);
1112         EXIT;
1113 }
1114 EXPORT_SYMBOL(class_disconnect_exports);
1115
1116 /* Remove exports that have not completed recovery.
1117  */
1118 int class_disconnect_stale_exports(struct obd_device *obd,
1119                                    int (*test_export)(struct obd_export *))
1120 {
1121         struct list_head work_list;
1122         struct list_head *pos, *n;
1123         struct obd_export *exp;
1124         int cnt = 0;
1125         ENTRY;
1126
1127         CFS_INIT_LIST_HEAD(&work_list);
1128         spin_lock(&obd->obd_dev_lock);
1129         list_for_each_safe(pos, n, &obd->obd_exports) {
1130                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1131                 if (test_export(exp))
1132                         continue;
1133
1134                 list_del(&exp->exp_obd_chain);
1135                 list_add(&exp->exp_obd_chain, &work_list);
1136                 /* don't count self-export as client */
1137                 if (obd_uuid_equals(&exp->exp_client_uuid,
1138                                      &exp->exp_obd->obd_uuid))
1139                         continue;
1140
1141                 cnt++;
1142                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1143                        obd->obd_name, exp->exp_client_uuid.uuid,
1144                        exp->exp_connection == NULL ? "<unknown>" :
1145                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1146         }
1147         spin_unlock(&obd->obd_dev_lock);
1148
1149         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1150                obd->obd_name, cnt);
1151         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1152         RETURN(cnt);
1153 }
1154 EXPORT_SYMBOL(class_disconnect_stale_exports);
1155
1156 void class_fail_export(struct obd_export *exp)
1157 {
1158         int rc, already_failed;
1159
1160         spin_lock(&exp->exp_lock);
1161         already_failed = exp->exp_failed;
1162         exp->exp_failed = 1;
1163         spin_unlock(&exp->exp_lock);
1164
1165         if (already_failed) {
1166                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1167                        exp, exp->exp_client_uuid.uuid);
1168                 return;
1169         }
1170
1171         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1172                exp, exp->exp_client_uuid.uuid);
1173
1174         if (obd_dump_on_timeout)
1175                 libcfs_debug_dumplog();
1176
1177         /* Most callers into obd_disconnect are removing their own reference
1178          * (request, for example) in addition to the one from the hash table.
1179          * We don't have such a reference here, so make one. */
1180         class_export_get(exp);
1181         rc = obd_disconnect(exp);
1182         if (rc)
1183                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1184         else
1185                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1186                        exp, exp->exp_client_uuid.uuid);
1187 }
1188 EXPORT_SYMBOL(class_fail_export);
1189
1190 char *obd_export_nid2str(struct obd_export *exp)
1191 {
1192         if (exp->exp_connection != NULL)
1193                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1194
1195         return "(no nid)";
1196 }
1197 EXPORT_SYMBOL(obd_export_nid2str);
1198
1199 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1200 {
1201         struct obd_export *doomed_exp = NULL;
1202         int exports_evicted = 0;
1203
1204         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1205
1206         do {
1207                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1208                 if (doomed_exp == NULL)
1209                         break;
1210
1211                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1212                          "nid %s found, wanted nid %s, requested nid %s\n",
1213                          obd_export_nid2str(doomed_exp),
1214                          libcfs_nid2str(nid_key), nid);
1215                 LASSERTF(doomed_exp != obd->obd_self_export,
1216                          "self-export is hashed by NID?\n");
1217                 exports_evicted++;
1218                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1219                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1220                        exports_evicted);
1221                 class_fail_export(doomed_exp);
1222                 class_export_put(doomed_exp);
1223         } while (1);
1224
1225         if (!exports_evicted)
1226                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1227                        obd->obd_name, nid);
1228         return exports_evicted;
1229 }
1230 EXPORT_SYMBOL(obd_export_evict_by_nid);
1231
1232 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1233 {
1234         struct obd_export *doomed_exp = NULL;
1235         struct obd_uuid doomed_uuid;
1236         int exports_evicted = 0;
1237
1238         obd_str2uuid(&doomed_uuid, uuid);
1239         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1240                 CERROR("%s: can't evict myself\n", obd->obd_name);
1241                 return exports_evicted;
1242         }
1243
1244         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1245
1246         if (doomed_exp == NULL) {
1247                 CERROR("%s: can't disconnect %s: no exports found\n",
1248                        obd->obd_name, uuid);
1249         } else {
1250                 CWARN("%s: evicting %s at adminstrative request\n",
1251                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1252                 class_fail_export(doomed_exp);
1253                 class_export_put(doomed_exp);
1254                 exports_evicted++;
1255         }
1256
1257         return exports_evicted;
1258 }
1259 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1260
1261 /**
1262  * kill zombie imports and exports
1263  */
1264 void obd_zombie_impexp_cull(void)
1265 {
1266         struct obd_import *import;
1267         struct obd_export *export;
1268         ENTRY;
1269
1270         do {
1271                 spin_lock (&obd_zombie_impexp_lock);
1272
1273                 import = NULL;
1274                 if (!list_empty(&obd_zombie_imports)) {
1275                         import = list_entry(obd_zombie_imports.next,
1276                                             struct obd_import,
1277                                             imp_zombie_chain);
1278                         list_del(&import->imp_zombie_chain);
1279                 }
1280
1281                 export = NULL;
1282                 if (!list_empty(&obd_zombie_exports)) {
1283                         export = list_entry(obd_zombie_exports.next,
1284                                             struct obd_export,
1285                                             exp_obd_chain);
1286                         list_del_init(&export->exp_obd_chain);
1287                 }
1288
1289                 spin_unlock(&obd_zombie_impexp_lock);
1290
1291                 if (import != NULL)
1292                         class_import_destroy(import);
1293
1294                 if (export != NULL)
1295                         class_export_destroy(export);
1296
1297         } while (import != NULL || export != NULL);
1298         EXIT;
1299 }
1300
1301 static struct completion        obd_zombie_start;
1302 static struct completion        obd_zombie_stop;
1303 static unsigned long            obd_zombie_flags;
1304 static cfs_waitq_t              obd_zombie_waitq;
1305
1306 enum {
1307         OBD_ZOMBIE_STOP = 1
1308 };
1309
1310 /**
1311  * check for work for kill zombie import/export thread.
1312  */
1313 static int obd_zombie_impexp_check(void *arg)
1314 {
1315         int rc;
1316
1317         spin_lock(&obd_zombie_impexp_lock);
1318         rc = list_empty(&obd_zombie_imports) &&
1319              list_empty(&obd_zombie_exports) &&
1320              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1321
1322         spin_unlock(&obd_zombie_impexp_lock);
1323
1324         RETURN(rc);
1325 }
1326
1327 /**
1328  * notify import/export destroy thread about new zombie.
1329  */
1330 static void obd_zombie_impexp_notify(void)
1331 {
1332         cfs_waitq_signal(&obd_zombie_waitq);
1333 }
1334
1335 /**
1336  * check whether obd_zombie is idle
1337  */
1338 static int obd_zombie_is_idle(void)
1339 {
1340         int rc;
1341
1342         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1343         spin_lock(&obd_zombie_impexp_lock);
1344         rc = list_empty(&obd_zombie_imports) &&
1345              list_empty(&obd_zombie_exports);
1346         spin_unlock(&obd_zombie_impexp_lock);
1347         return rc;
1348 }
1349
1350 /**
1351  * wait when obd_zombie import/export queues become empty
1352  */
1353 void obd_zombie_barrier(void)
1354 {
1355         struct l_wait_info lwi = { 0 };
1356
1357         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1358 }
1359 EXPORT_SYMBOL(obd_zombie_barrier);
1360
1361 #ifdef __KERNEL__
1362
1363 /**
1364  * destroy zombie export/import thread.
1365  */
1366 static int obd_zombie_impexp_thread(void *unused)
1367 {
1368         int rc;
1369
1370         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1371                 complete(&obd_zombie_start);
1372                 RETURN(rc);
1373         }
1374
1375         complete(&obd_zombie_start);
1376
1377         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1378                 struct l_wait_info lwi = { 0 };
1379
1380                 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1381
1382                 obd_zombie_impexp_cull();
1383                 /* Notify obd_zombie_barrier callers that queues may be empty */
1384                 cfs_waitq_signal(&obd_zombie_waitq);
1385         }
1386
1387         complete(&obd_zombie_stop);
1388
1389         RETURN(0);
1390 }
1391
1392 #else /* ! KERNEL */
1393
1394 static atomic_t zombie_recur = ATOMIC_INIT(0);
1395 static void *obd_zombie_impexp_work_cb;
1396 static void *obd_zombie_impexp_idle_cb;
1397
1398 int obd_zombie_impexp_kill(void *arg)
1399 {
1400         int rc = 0;
1401
1402         if (atomic_inc_return(&zombie_recur) == 1) {
1403                 obd_zombie_impexp_cull();
1404                 rc = 1;
1405         }
1406         atomic_dec(&zombie_recur);
1407         return rc;
1408 }
1409
1410 #endif
1411
1412 /**
1413  * start destroy zombie import/export thread
1414  */
1415 int obd_zombie_impexp_init(void)
1416 {
1417         int rc;
1418
1419         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1420         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1421         spin_lock_init(&obd_zombie_impexp_lock);
1422         init_completion(&obd_zombie_start);
1423         init_completion(&obd_zombie_stop);
1424         cfs_waitq_init(&obd_zombie_waitq);
1425
1426 #ifdef __KERNEL__
1427         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1428         if (rc < 0)
1429                 RETURN(rc);
1430
1431         wait_for_completion(&obd_zombie_start);
1432 #else
1433
1434         obd_zombie_impexp_work_cb =
1435                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1436                                                  &obd_zombie_impexp_kill, NULL);
1437
1438         obd_zombie_impexp_idle_cb =
1439                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1440                                                  &obd_zombie_impexp_check, NULL);
1441         rc = 0;
1442
1443 #endif
1444         RETURN(rc);
1445 }
1446 /**
1447  * stop destroy zombie import/export thread
1448  */
1449 void obd_zombie_impexp_stop(void)
1450 {
1451         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1452         obd_zombie_impexp_notify();
1453 #ifdef __KERNEL__
1454         wait_for_completion(&obd_zombie_stop);
1455 #else
1456         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1457         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1458 #endif
1459 }