Whamcloud - gitweb
8b00ad97b1dd2df9d6230a44f4ee842f02a281ab
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <libcfs/bitmap.h>
50
51 extern cfs_list_t obd_types;
52 cfs_spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 cfs_list_t      obd_zombie_imports;
60 cfs_list_t      obd_zombie_exports;
61 cfs_spinlock_t  obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65 static void print_export_data(struct obd_export *exp,
66                               const char *status, int locks);
67
68 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
69
70
71 cfs_spinlock_t obd_minor_lock;
72 /**
73  * Maximum number of OBD devices on a single node (includes devices
74  * from all filesystems mounted on a client). This limit is itself arbitrary,
75  * though the lov_user_md_{v1,v3} structures (used for specifying the
76  * striping layout from llapi_setstripe() and on directory default EAs)
77  * have a 16-bit limit on the starting OST index.
78  **/
79 const int obd_minor_map_size = 65536;
80 cfs_bitmap_t *obd_minor_map;
81
82 int obd_minor_alloc(void)
83 {
84         int ret;
85
86         cfs_spin_lock(&obd_minor_lock);
87         ret = cfs_find_first_zero_bit(obd_minor_map->data, obd_minor_map_size);
88         if (ret != obd_minor_map_size)
89                 cfs_bitmap_set(obd_minor_map, ret);
90         else
91                 ret = -1;
92         cfs_spin_unlock(&obd_minor_lock);
93
94         return ret;
95 }
96 void obd_minor_release(long minor)
97 {
98         cfs_spin_lock(&obd_minor_lock);
99         cfs_bitmap_clear(obd_minor_map, minor);
100         cfs_spin_unlock(&obd_minor_lock);
101 }
102
103 int obd_minor_valid(long minor)
104 {
105         int ret;
106
107         cfs_spin_lock(&obd_minor_lock);
108         ret = cfs_bitmap_check(obd_minor_map, minor);
109         cfs_spin_unlock(&obd_minor_lock);
110
111         return ret;
112 }
113
114 static CFS_LIST_HEAD(obd_dev_list);
115 static const int obd_hash_init_bits = 10;
116 static const int obd_hash_max_bits = 30;
117 static const int obd_hash_bkt_bits = 10;
118
119 static cfs_hash_t *obd_name_hash = NULL;
120 static unsigned obd_name_hops_hash(cfs_hash_t *lh, const void *key,
121                                    unsigned mask)
122 {
123         return cfs_hash_djb2_hash(key, strlen(key), mask);
124 }
125
126 static void *obd_name_hops_obj(cfs_hlist_node_t *hn)
127 {
128         struct obd_device *obd = cfs_hlist_entry(hn, struct obd_device,
129                                                  obd_name_node);
130         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
131                  "%p obd_magic %08x != %08x\n",
132                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
133
134         return (void *)obd;
135 }
136
137 static void *obd_name_hops_key(cfs_hlist_node_t *hn)
138 {
139         struct obd_device *obd = obd_name_hops_obj(hn);
140
141         return &obd->obd_name;
142 }
143
144 static int obd_name_hops_compare(const void *key, cfs_hlist_node_t *hn)
145 {
146         void *nk = obd_name_hops_key(hn);
147
148         return strcmp(key, nk) == 0;
149 }
150
151 static void obd_name_hops_noop(cfs_hash_t *hs, cfs_hlist_node_t *hn)
152 {
153         obd_name_hops_obj(hn);
154 }
155
156 static cfs_hash_ops_t obd_name_hops = {
157         .hs_hash        = obd_name_hops_hash,
158         .hs_keycmp      = obd_name_hops_compare,
159         .hs_key         = obd_name_hops_key,
160         .hs_object      = obd_name_hops_obj,
161         .hs_get         = obd_name_hops_noop,
162         .hs_put_locked  = obd_name_hops_noop,
163 };
164
165 static cfs_hash_t *obd_uuid_hash = NULL;
166 static unsigned obd_uuid_hops_hash(cfs_hash_t *lh, const void *key,
167                                    unsigned mask)
168 {
169         return cfs_hash_djb2_hash(key, strlen(key), mask);
170 }
171
172 static void *obd_uuid_hops_obj(cfs_hlist_node_t *hn)
173 {
174         struct obd_device *obd = cfs_hlist_entry(hn, struct obd_device,
175                                                  obd_uuid_node);
176         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
177                  "%p obd_magic %08x != %08x\n",
178                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
179
180         return (void *)obd;
181 }
182
183 static void *obd_uuid_hops_key(cfs_hlist_node_t *hn)
184 {
185         struct obd_device *obd = obd_uuid_hops_obj(hn);
186
187         return &obd->obd_uuid;
188 }
189
190 static int obd_uuid_hops_compare(const void *key, cfs_hlist_node_t *hn)
191 {
192         void *nk = obd_uuid_hops_key(hn);
193
194         return obd_uuid_equals(key, nk);
195 }
196
197 static void obd_uuid_hops_noop(cfs_hash_t *hs, cfs_hlist_node_t *hn)
198 {
199         obd_uuid_hops_obj(hn);
200 }
201
202 static cfs_hash_ops_t obd_uuid_hops = {
203         .hs_hash        = obd_uuid_hops_hash,
204         .hs_keycmp      = obd_uuid_hops_compare,
205         .hs_key         = obd_uuid_hops_key,
206         .hs_object      = obd_uuid_hops_obj,
207         .hs_get         = obd_uuid_hops_noop,
208         .hs_put_locked  = obd_uuid_hops_noop,
209 };
210
211 static cfs_hash_t *obd_minor_hash = NULL;
212 static unsigned obd_minor_hops_hash(cfs_hash_t *lh, const void *key,
213                                     unsigned mask)
214 {
215         return cfs_hash_u32_hash(*((__u32 *)key), mask);
216 }
217
218 static void *obd_minor_hops_obj(cfs_hlist_node_t *hn)
219 {
220         struct obd_device *obd = cfs_hlist_entry(hn, struct obd_device,
221                                                  obd_minor_node);
222         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
223                  "%p obd_magic %08x != %08x\n",
224                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
225
226         return (void *)obd;
227 }
228
229 static void *obd_minor_hops_key(cfs_hlist_node_t *hn)
230 {
231         struct obd_device *obd = obd_minor_hops_obj(hn);
232
233         return &obd->obd_minor;
234 }
235
236 static int obd_minor_hops_compare(const void *key, cfs_hlist_node_t *hn)
237 {
238         __u32 *nk = obd_minor_hops_key(hn);
239
240         return *((__u32 *)key) == *nk;
241 }
242
243 static void obd_minor_hops_noop(cfs_hash_t *hs, cfs_hlist_node_t *hn)
244 {
245         obd_minor_hops_obj(hn);
246 }
247
248 static cfs_hash_ops_t obd_minor_hops = {
249         .hs_hash        = obd_minor_hops_hash,
250         .hs_keycmp      = obd_minor_hops_compare,
251         .hs_key         = obd_minor_hops_key,
252         .hs_object      = obd_minor_hops_obj,
253         .hs_get         = obd_minor_hops_noop,
254         .hs_put_locked  = obd_minor_hops_noop,
255 };
256
257 int obd_hashes_init(void)
258 {
259         obd_name_hash = cfs_hash_create("obd_name",
260                                         obd_hash_init_bits, obd_hash_max_bits,
261                                         obd_hash_bkt_bits, 0,
262                                         CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
263                                         &obd_name_hops,
264                                         CFS_HASH_DEFAULT | CFS_HASH_NO_ITEMREF);
265         if (obd_name_hash == NULL)
266                 return -ENOMEM;
267
268         obd_uuid_hash = cfs_hash_create("obd_uuid",
269                                         obd_hash_init_bits, obd_hash_max_bits,
270                                         obd_hash_bkt_bits, 0,
271                                         CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
272                                         &obd_uuid_hops,
273                                         CFS_HASH_DEFAULT | CFS_HASH_NO_ITEMREF);
274         if (obd_name_hash == NULL)
275                 return -ENOMEM;
276
277         obd_minor_hash = cfs_hash_create("obd_minor",
278                                          obd_hash_init_bits, obd_hash_max_bits,
279                                          obd_hash_bkt_bits, 0,
280                                          CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
281                                          &obd_minor_hops,
282                                          CFS_HASH_DEFAULT | CFS_HASH_NO_ITEMREF);
283         if (obd_name_hash == NULL)
284                 return -ENOMEM;
285
286         obd_minor_map = CFS_ALLOCATE_BITMAP(obd_minor_map_size);
287         if (obd_minor_map == NULL)
288                 return -ENOMEM;
289
290         cfs_spin_lock_init(&obd_minor_lock);
291
292         return 0;
293 }
294
295 void obd_hashes_fini(void)
296 {
297         if (obd_minor_map)
298                CFS_FREE_BITMAP(obd_minor_map);
299         cfs_hash_putref(obd_name_hash);
300         cfs_hash_putref(obd_uuid_hash);
301         cfs_hash_putref(obd_minor_hash);
302 }
303
304 /*
305  * support functions: we could use inter-module communication, but this
306  * is more portable to other OS's
307  */
308 static struct obd_device *obd_device_alloc(void)
309 {
310         struct obd_device *obd;
311
312         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
313         if (obd != NULL) {
314                 obd->obd_magic = OBD_DEVICE_MAGIC;
315         }
316         return obd;
317 }
318
319 static void obd_device_free(struct obd_device *obd)
320 {
321         LASSERT(obd != NULL);
322         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
323                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
324         if (obd->obd_namespace != NULL) {
325                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
326                        obd, obd->obd_namespace, obd->obd_force);
327                 LBUG();
328         }
329         lu_ref_fini(&obd->obd_reference);
330         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
331 }
332
333 struct obd_type *class_search_type(const char *name)
334 {
335         cfs_list_t *tmp;
336         struct obd_type *type;
337
338         cfs_spin_lock(&obd_types_lock);
339         cfs_list_for_each(tmp, &obd_types) {
340                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
341                 if (strcmp(type->typ_name, name) == 0) {
342                         cfs_spin_unlock(&obd_types_lock);
343                         return type;
344                 }
345         }
346         cfs_spin_unlock(&obd_types_lock);
347         return NULL;
348 }
349
350 struct obd_type *class_get_type(const char *name)
351 {
352         struct obd_type *type = class_search_type(name);
353
354 #ifdef HAVE_MODULE_LOADING_SUPPORT
355         if (!type) {
356                 const char *modname = name;
357                 if (!cfs_request_module("%s", modname)) {
358                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
359                         type = class_search_type(name);
360                 } else {
361                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
362                                            modname);
363                 }
364         }
365 #endif
366         if (type) {
367                 cfs_spin_lock(&type->obd_type_lock);
368                 type->typ_refcnt++;
369                 cfs_try_module_get(type->typ_dt_ops->o_owner);
370                 cfs_spin_unlock(&type->obd_type_lock);
371         }
372         return type;
373 }
374
375 void class_put_type(struct obd_type *type)
376 {
377         LASSERT(type);
378         cfs_spin_lock(&type->obd_type_lock);
379         type->typ_refcnt--;
380         cfs_module_put(type->typ_dt_ops->o_owner);
381         cfs_spin_unlock(&type->obd_type_lock);
382 }
383
384 #define CLASS_MAX_NAME 1024
385
386 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
387                         struct lprocfs_vars *vars, const char *name,
388                         struct lu_device_type *ldt)
389 {
390         struct obd_type *type;
391         int rc = 0;
392         ENTRY;
393
394         /* sanity check */
395         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
396
397         if (class_search_type(name)) {
398                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
399                 RETURN(-EEXIST);
400         }
401
402         rc = -ENOMEM;
403         OBD_ALLOC(type, sizeof(*type));
404         if (type == NULL)
405                 RETURN(rc);
406
407         OBD_ALLOC_PTR(type->typ_dt_ops);
408         OBD_ALLOC_PTR(type->typ_md_ops);
409         OBD_ALLOC(type->typ_name, strlen(name) + 1);
410
411         if (type->typ_dt_ops == NULL ||
412             type->typ_md_ops == NULL ||
413             type->typ_name == NULL)
414                 GOTO (failed, rc);
415
416         *(type->typ_dt_ops) = *dt_ops;
417         /* md_ops is optional */
418         if (md_ops)
419                 *(type->typ_md_ops) = *md_ops;
420         strcpy(type->typ_name, name);
421         cfs_spin_lock_init(&type->obd_type_lock);
422
423 #ifdef LPROCFS
424         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
425                                               vars, type);
426         if (IS_ERR(type->typ_procroot)) {
427                 rc = PTR_ERR(type->typ_procroot);
428                 type->typ_procroot = NULL;
429                 GOTO (failed, rc);
430         }
431 #endif
432         if (ldt != NULL) {
433                 type->typ_lu = ldt;
434                 rc = lu_device_type_init(ldt);
435                 if (rc != 0)
436                         GOTO (failed, rc);
437         }
438
439         cfs_spin_lock(&obd_types_lock);
440         cfs_list_add(&type->typ_chain, &obd_types);
441         cfs_spin_unlock(&obd_types_lock);
442
443         RETURN (0);
444
445  failed:
446         if (type->typ_name != NULL)
447                 OBD_FREE(type->typ_name, strlen(name) + 1);
448         if (type->typ_md_ops != NULL)
449                 OBD_FREE_PTR(type->typ_md_ops);
450         if (type->typ_dt_ops != NULL)
451                 OBD_FREE_PTR(type->typ_dt_ops);
452         OBD_FREE(type, sizeof(*type));
453         RETURN(rc);
454 }
455
456 int class_unregister_type(const char *name)
457 {
458         struct obd_type *type = class_search_type(name);
459         ENTRY;
460
461         if (!type) {
462                 CERROR("unknown obd type\n");
463                 RETURN(-EINVAL);
464         }
465
466         if (type->typ_refcnt) {
467                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
468                 /* This is a bad situation, let's make the best of it */
469                 /* Remove ops, but leave the name for debugging */
470                 OBD_FREE_PTR(type->typ_dt_ops);
471                 OBD_FREE_PTR(type->typ_md_ops);
472                 RETURN(-EBUSY);
473         }
474
475         if (type->typ_procroot) {
476                 lprocfs_remove(&type->typ_procroot);
477         }
478
479         if (type->typ_lu)
480                 lu_device_type_fini(type->typ_lu);
481
482         cfs_spin_lock(&obd_types_lock);
483         cfs_list_del(&type->typ_chain);
484         cfs_spin_unlock(&obd_types_lock);
485         OBD_FREE(type->typ_name, strlen(name) + 1);
486         if (type->typ_dt_ops != NULL)
487                 OBD_FREE_PTR(type->typ_dt_ops);
488         if (type->typ_md_ops != NULL)
489                 OBD_FREE_PTR(type->typ_md_ops);
490         OBD_FREE(type, sizeof(*type));
491         RETURN(0);
492 } /* class_unregister_type */
493
494 const char *obd_dev_status(struct obd_device *obd)
495 {
496         const char *status;
497
498         if (obd->obd_stopping)
499                 status = "ST";
500         else if (obd->obd_inactive)
501                 status = "IN";
502         else if (obd->obd_set_up)
503                 status = "UP";
504         else if (obd->obd_attached)
505                 status = "AT";
506         else
507                 status = "--";
508
509         return status;
510 }
511
512 #define cfs_list_entry_safe(pos, head, type, member)  \
513         (pos == head ? NULL : cfs_list_entry(pos, type, member))
514
515 void obd_devlist_first(struct obd_device **pos)
516 {
517         struct obd_device *obd;
518
519         cfs_spin_lock(&obd_dev_lock);
520         obd = cfs_list_entry_safe(obd_dev_list.next, &obd_dev_list,
521                                   struct obd_device, obd_list);
522         if (obd != NULL)
523                 class_incref(obd, "devlist", obd);
524         cfs_spin_unlock(&obd_dev_lock);
525
526         *pos = obd;
527 }
528
529 void obd_devlist_next(struct obd_device **pos)
530 {
531         struct obd_device *obd = NULL;
532
533         cfs_spin_lock(&obd_dev_lock);
534         obd = cfs_list_entry_safe((*pos)->obd_list.next, &obd_dev_list,
535                                   struct obd_device, obd_list);
536         if (obd)
537                 class_incref(obd, "devlist", obd);
538         cfs_spin_unlock(&obd_dev_lock);
539
540         class_decref(*pos, "devlist", *pos);
541         *pos = obd;
542 }
543
544 void obd_devlist_last(struct obd_device *pos)
545 {
546         if (pos)
547                 class_decref(pos,"devlist", pos);
548
549 }
550
551 /**
552  * Create a new obd device.
553  *
554  * Find an empty slot in ::obd_devs[], create a new obd device in it.
555  *
556  * \param[in] type_name obd device type string.
557  * \param[in] name      obd device name.
558  *
559  * \retval NULL if create fails, otherwise return the obd device
560  *         pointer created.
561  */
562 struct obd_device *class_newdev(const char *type_name, const char *name, const char *uuid)
563 {
564         struct obd_device *newdev;
565         struct obd_type *type;
566         long ret;
567
568         if (strlen(name) >= MAX_OBD_NAME) {
569                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
570                 RETURN(ERR_PTR(-EINVAL));
571         }
572
573         type = class_get_type(type_name);
574         if (type == NULL){
575                 CERROR("OBD: unknown type: %s\n", type_name);
576                 ret = -ENODEV;
577                 goto error_type;
578         }
579
580         newdev = obd_device_alloc();
581         if (newdev == NULL) {
582                 ret = -ENOMEM;
583                 goto error_device;
584         }
585
586         newdev->obd_minor = obd_minor_alloc();
587         if (newdev->obd_minor < 0) {
588                 CERROR("don't have free minors\n");
589                 ret = -ENODATA;
590                 goto error_minor;
591         }
592
593         /* find add unique by name */
594         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
595         if (cfs_hash_add_unique(obd_name_hash, name, &newdev->obd_name_node)) {
596                 CERROR("fails to add an unique obddev (%s) to the hash\n",
597                        name);
598                 ret = -EEXIST;
599                 goto error_dup;
600         }
601         newdev->obd_type = type;
602
603         cfs_hash_add(obd_minor_hash, &newdev->obd_minor, &newdev->obd_minor_node);
604         memcpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
605         cfs_hash_add(obd_uuid_hash, uuid, &newdev->obd_uuid_node);
606
607         cfs_spin_lock(&obd_dev_lock);
608         cfs_list_add_tail(&newdev->obd_list, &obd_dev_list);
609         cfs_spin_unlock(&obd_dev_lock);
610
611         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
612                newdev->obd_name, newdev);
613         RETURN(newdev);
614 error_dup:
615        obd_minor_release(newdev->obd_minor);
616 error_minor:
617         obd_device_free(newdev);
618 error_device:
619         class_put_type(type);
620 error_type:
621         RETURN(ERR_PTR(ret));
622 }
623
624 void class_release_dev(struct obd_device *obd)
625 {
626         struct obd_type *obd_type = obd->obd_type;
627
628         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
629                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
630         LASSERT(obd_type != NULL);
631
632         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
633                obd->obd_name,obd->obd_type->typ_name);
634
635         cfs_hash_del(obd_name_hash, obd->obd_name, &obd->obd_name_node);
636         cfs_hash_del(obd_uuid_hash, &obd->obd_uuid, &obd->obd_uuid_node);
637         cfs_hash_del(obd_minor_hash, &obd->obd_minor, &obd->obd_minor_node);
638
639         cfs_spin_lock(&obd_dev_lock);
640         cfs_list_del(&obd->obd_list);
641         cfs_spin_unlock(&obd_dev_lock);
642
643         obd_minor_release(obd->obd_minor);
644         obd_device_free(obd);
645
646         class_put_type(obd_type);
647 }
648
649 int class_name2dev(const char *name)
650 {
651         struct obd_device *obd;
652
653         obd = cfs_hash_lookup(obd_name_hash, name);
654         return obd != NULL ? obd->obd_minor : -1 ;
655 }
656
657 struct obd_device *class_name2obd(const char *name)
658 {
659         return cfs_hash_lookup(obd_name_hash, name);
660 }
661
662 int class_uuid2dev(struct obd_uuid *uuid)
663 {
664         struct obd_device *obd;
665
666         obd = cfs_hash_lookup(obd_uuid_hash, uuid);
667         return obd != NULL ? obd->obd_minor : -1;
668 }
669
670 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
671 {
672         return  cfs_hash_lookup(obd_uuid_hash, uuid);
673 }
674
675 /**
676  * Get obd device from ::obd_devs[]
677  *
678  * \param num [in] array index
679  *
680  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
681  *         otherwise return the obd device there.
682  */
683 struct obd_device *class_num2obd(__u32 minor)
684 {
685         struct obd_device *obd;
686
687         obd = cfs_hash_lookup(obd_minor_hash, &minor);
688
689         return obd;
690 }
691
692 void class_obd_list(void)
693 {
694         const char *status;
695         struct obd_device *obd;
696
697         for (obd_devlist_first(&obd);
698              obd != NULL;
699              obd_devlist_next(&obd)) {
700
701                 status = obd_dev_status(obd);
702                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
703                          obd->obd_minor, status, obd->obd_type->typ_name,
704                          obd->obd_name, obd->obd_uuid.uuid,
705                          cfs_atomic_read(&obd->obd_refcount));
706         }
707         return;
708 }
709
710 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
711    specified, then only the client with that uuid is returned,
712    otherwise any client connected to the tgt is returned. */
713 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
714                                           const char * typ_name,
715                                           struct obd_uuid *grp_uuid)
716 {
717         struct obd_device *obd;
718
719         for (obd_devlist_first(&obd);
720              obd != NULL;
721              obd_devlist_next(&obd)) {
722                 /* XXX per type list ? */
723                 if ((strncmp(obd->obd_type->typ_name, typ_name,
724                              strlen(typ_name)) == 0)) {
725                         if (obd_uuid_equals(tgt_uuid,
726                                             &obd->u.cli.cl_target_uuid) &&
727                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
728                                                          &obd->obd_uuid) : 1)) {
729                                 obd_devlist_last(obd);
730                                 return obd;
731                         }
732                 }
733         }
734
735         return NULL;
736 }
737
738 /* Iterate the obd_device list looking devices have grp_uuid. Start
739    searching at *next, and if a device is found, the next index to look
740    at is saved in *next. If next is NULL, then the first matching device
741    will always be returned. */
742 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid,
743                                            struct obd_device **prev)
744 {
745         struct obd_device *obd = *prev;
746
747         if (obd == NULL)
748                obd_devlist_first(&obd);
749         else
750                obd_devlist_next(&obd);
751
752
753         for (; obd != NULL; obd_devlist_next(&obd)) {
754                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
755                         /* XXX return with reference */
756                         *prev = obd;
757                         return obd;
758                 }
759         }
760
761         return NULL;
762 }
763
764 /**
765  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
766  * adjust sptlrpc settings accordingly.
767  */
768 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
769 {
770         struct obd_device  *obd;
771         const char         *type;
772         int                 rc = 0, rc2;
773
774         LASSERT(namelen > 0);
775
776         for (obd_devlist_first(&obd);
777              obd != NULL;
778              obd_devlist_next(&obd)) {
779                 if (obd->obd_set_up == 0 || obd->obd_stopping)
780                         continue;
781
782                 /* only notify mdc, osc, mdt, ost */
783                 type = obd->obd_type->typ_name;
784                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
785                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
786                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
787                     strcmp(type, LUSTRE_OST_NAME) != 0)
788                         continue;
789
790                 if (strncmp(obd->obd_name, fsname, namelen))
791                         continue;
792
793                 /** XXX - some new obd can be added at that point */
794                 rc2 = obd_set_info_async(obd->obd_self_export,
795                                          sizeof(KEY_SPTLRPC_CONF),
796                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
797                 rc = rc ? rc : rc2;
798         }
799
800         return rc;
801 }
802 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
803
804 void obd_cleanup_caches(void)
805 {
806         int rc;
807
808         ENTRY;
809         if (obd_device_cachep) {
810                 rc = cfs_mem_cache_destroy(obd_device_cachep);
811                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
812                 obd_device_cachep = NULL;
813         }
814         if (obdo_cachep) {
815                 rc = cfs_mem_cache_destroy(obdo_cachep);
816                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
817                 obdo_cachep = NULL;
818         }
819         if (import_cachep) {
820                 rc = cfs_mem_cache_destroy(import_cachep);
821                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
822                 import_cachep = NULL;
823         }
824         if (capa_cachep) {
825                 rc = cfs_mem_cache_destroy(capa_cachep);
826                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
827                 capa_cachep = NULL;
828         }
829         EXIT;
830 }
831
832 int obd_init_caches(void)
833 {
834         ENTRY;
835
836         LASSERT(obd_device_cachep == NULL);
837         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
838                                                  sizeof(struct obd_device),
839                                                  0, 0);
840         if (!obd_device_cachep)
841                 GOTO(out, -ENOMEM);
842
843         LASSERT(obdo_cachep == NULL);
844         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
845                                            0, 0);
846         if (!obdo_cachep)
847                 GOTO(out, -ENOMEM);
848
849         LASSERT(import_cachep == NULL);
850         import_cachep = cfs_mem_cache_create("ll_import_cache",
851                                              sizeof(struct obd_import),
852                                              0, 0);
853         if (!import_cachep)
854                 GOTO(out, -ENOMEM);
855
856         LASSERT(capa_cachep == NULL);
857         capa_cachep = cfs_mem_cache_create("capa_cache",
858                                            sizeof(struct obd_capa), 0, 0);
859         if (!capa_cachep)
860                 GOTO(out, -ENOMEM);
861
862         RETURN(0);
863  out:
864         obd_cleanup_caches();
865         RETURN(-ENOMEM);
866
867 }
868
869 /* map connection to client */
870 struct obd_export *class_conn2export(struct lustre_handle *conn)
871 {
872         struct obd_export *export;
873         ENTRY;
874
875         if (!conn) {
876                 CDEBUG(D_CACHE, "looking for null handle\n");
877                 RETURN(NULL);
878         }
879
880         if (conn->cookie == -1) {  /* this means assign a new connection */
881                 CDEBUG(D_CACHE, "want a new connection\n");
882                 RETURN(NULL);
883         }
884
885         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
886         export = class_handle2object(conn->cookie);
887         RETURN(export);
888 }
889
890 struct obd_device *class_exp2obd(struct obd_export *exp)
891 {
892         if (exp)
893                 return exp->exp_obd;
894         return NULL;
895 }
896
897 struct obd_device *class_conn2obd(struct lustre_handle *conn)
898 {
899         struct obd_export *export;
900         export = class_conn2export(conn);
901         if (export) {
902                 struct obd_device *obd = export->exp_obd;
903                 class_export_put(export);
904                 return obd;
905         }
906         return NULL;
907 }
908
909 struct obd_import *class_exp2cliimp(struct obd_export *exp)
910 {
911         struct obd_device *obd = exp->exp_obd;
912         if (obd == NULL)
913                 return NULL;
914         return obd->u.cli.cl_import;
915 }
916
917 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
918 {
919         struct obd_device *obd = class_conn2obd(conn);
920         if (obd == NULL)
921                 return NULL;
922         return obd->u.cli.cl_import;
923 }
924
925 /* Export management functions */
926 static void class_export_destroy(struct obd_export *exp)
927 {
928         struct obd_device *obd = exp->exp_obd;
929         ENTRY;
930
931         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
932
933         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
934                exp->exp_client_uuid.uuid, obd->obd_name);
935
936         LASSERT(obd != NULL);
937
938         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
939         if (exp->exp_connection)
940                 ptlrpc_put_connection_superhack(exp->exp_connection);
941
942         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
943         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
944         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
945         LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
946         obd_destroy_export(exp);
947         class_decref(obd, "export", exp);
948
949         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
950         EXIT;
951 }
952
953 static void export_handle_addref(void *export)
954 {
955         class_export_get(export);
956 }
957
958 struct obd_export *class_export_get(struct obd_export *exp)
959 {
960         cfs_atomic_inc(&exp->exp_refcount);
961         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
962                cfs_atomic_read(&exp->exp_refcount));
963         return exp;
964 }
965 EXPORT_SYMBOL(class_export_get);
966
967 void class_export_put(struct obd_export *exp)
968 {
969         LASSERT(exp != NULL);
970         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, 0x5a5a5a);
971         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
972                cfs_atomic_read(&exp->exp_refcount) - 1);
973
974         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
975                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
976                 CDEBUG(D_IOCTL, "final put %p/%s\n",
977                        exp, exp->exp_client_uuid.uuid);
978
979                 /* release nid stat refererence */
980                 lprocfs_exp_cleanup(exp);
981
982                 obd_zombie_export_add(exp);
983         }
984 }
985 EXPORT_SYMBOL(class_export_put);
986
987 /* Creates a new export, adds it to the hash table, and returns a
988  * pointer to it. The refcount is 2: one for the hash reference, and
989  * one for the pointer returned by this function. */
990 struct obd_export *class_new_export(struct obd_device *obd,
991                                     struct obd_uuid *cluuid)
992 {
993         struct obd_export *export;
994         cfs_hash_t *hash = NULL;
995         int rc = 0;
996         ENTRY;
997
998         OBD_ALLOC_PTR(export);
999         if (!export)
1000                 return ERR_PTR(-ENOMEM);
1001
1002         export->exp_conn_cnt = 0;
1003         export->exp_lock_hash = NULL;
1004         cfs_atomic_set(&export->exp_refcount, 2);
1005         cfs_atomic_set(&export->exp_rpc_count, 0);
1006         cfs_atomic_set(&export->exp_cb_count, 0);
1007         cfs_atomic_set(&export->exp_locks_count, 0);
1008 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1009         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
1010         cfs_spin_lock_init(&export->exp_locks_list_guard);
1011 #endif
1012         cfs_atomic_set(&export->exp_replay_count, 0);
1013         export->exp_obd = obd;
1014         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
1015         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
1016         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1017         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
1018         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
1019         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
1020         class_handle_hash(&export->exp_handle, export_handle_addref);
1021         export->exp_last_request_time = cfs_time_current_sec();
1022         cfs_spin_lock_init(&export->exp_lock);
1023         cfs_spin_lock_init(&export->exp_rpc_lock);
1024         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
1025         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
1026
1027         export->exp_sp_peer = LUSTRE_SP_ANY;
1028         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1029         export->exp_client_uuid = *cluuid;
1030         obd_init_export(export);
1031
1032         cfs_spin_lock(&obd->obd_dev_lock);
1033          /* shouldn't happen, but might race */
1034         if (obd->obd_stopping)
1035                 GOTO(exit_unlock, rc = -ENODEV);
1036
1037         hash = cfs_hash_getref(obd->obd_uuid_hash);
1038         if (hash == NULL)
1039                 GOTO(exit_unlock, rc = -ENODEV);
1040         cfs_spin_unlock(&obd->obd_dev_lock);
1041
1042         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1043                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1044                 if (rc != 0) {
1045                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1046                                       obd->obd_name, cluuid->uuid, rc);
1047                         GOTO(exit_err, rc = -EALREADY);
1048                 }
1049         }
1050
1051         cfs_spin_lock(&obd->obd_dev_lock);
1052         if (obd->obd_stopping) {
1053                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1054                 GOTO(exit_unlock, rc = -ENODEV);
1055         }
1056
1057         class_incref(obd, "export", export);
1058         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
1059         cfs_list_add_tail(&export->exp_obd_chain_timed,
1060                           &export->exp_obd->obd_exports_timed);
1061         export->exp_obd->obd_num_exports++;
1062         cfs_spin_unlock(&obd->obd_dev_lock);
1063         cfs_hash_putref(hash);
1064         RETURN(export);
1065
1066 exit_unlock:
1067         cfs_spin_unlock(&obd->obd_dev_lock);
1068 exit_err:
1069         if (hash)
1070                 cfs_hash_putref(hash);
1071         class_handle_unhash(&export->exp_handle);
1072         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
1073         obd_destroy_export(export);
1074         OBD_FREE_PTR(export);
1075         return ERR_PTR(rc);
1076 }
1077 EXPORT_SYMBOL(class_new_export);
1078
1079 void class_unlink_export(struct obd_export *exp)
1080 {
1081         class_handle_unhash(&exp->exp_handle);
1082
1083         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1084         /* delete an uuid-export hashitem from hashtables */
1085         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
1086                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1087                              &exp->exp_client_uuid,
1088                              &exp->exp_uuid_hash);
1089
1090         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1091         cfs_list_del_init(&exp->exp_obd_chain_timed);
1092         exp->exp_obd->obd_num_exports--;
1093         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1094         class_export_put(exp);
1095 }
1096 EXPORT_SYMBOL(class_unlink_export);
1097
1098 /* Import management functions */
1099 void class_import_destroy(struct obd_import *imp)
1100 {
1101         ENTRY;
1102
1103         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1104                 imp->imp_obd->obd_name);
1105
1106         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1107
1108         ptlrpc_put_connection_superhack(imp->imp_connection);
1109
1110         while (!cfs_list_empty(&imp->imp_conn_list)) {
1111                 struct obd_import_conn *imp_conn;
1112
1113                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
1114                                           struct obd_import_conn, oic_item);
1115                 cfs_list_del_init(&imp_conn->oic_item);
1116                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1117                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1118         }
1119
1120         LASSERT(imp->imp_sec == NULL);
1121         class_decref(imp->imp_obd, "import", imp);
1122         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1123         EXIT;
1124 }
1125
1126 static void import_handle_addref(void *import)
1127 {
1128         class_import_get(import);
1129 }
1130
1131 struct obd_import *class_import_get(struct obd_import *import)
1132 {
1133         cfs_atomic_inc(&import->imp_refcount);
1134         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1135                cfs_atomic_read(&import->imp_refcount),
1136                import->imp_obd->obd_name);
1137         return import;
1138 }
1139 EXPORT_SYMBOL(class_import_get);
1140
1141 void class_import_put(struct obd_import *imp)
1142 {
1143         ENTRY;
1144
1145         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1146         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, 0x5a5a5a);
1147
1148         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1149                cfs_atomic_read(&imp->imp_refcount) - 1,
1150                imp->imp_obd->obd_name);
1151
1152         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1153                 CDEBUG(D_INFO, "final put import %p\n", imp);
1154                 obd_zombie_import_add(imp);
1155         }
1156
1157         EXIT;
1158 }
1159 EXPORT_SYMBOL(class_import_put);
1160
1161 static void init_imp_at(struct imp_at *at) {
1162         int i;
1163         at_init(&at->iat_net_latency, 0, 0);
1164         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1165                 /* max service estimates are tracked on the server side, so
1166                    don't use the AT history here, just use the last reported
1167                    val. (But keep hist for proc histogram, worst_ever) */
1168                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1169                         AT_FLG_NOHIST);
1170         }
1171 }
1172
1173 struct obd_import *class_new_import(struct obd_device *obd)
1174 {
1175         struct obd_import *imp;
1176
1177         OBD_ALLOC(imp, sizeof(*imp));
1178         if (imp == NULL)
1179                 return NULL;
1180
1181         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1182         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1183         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1184         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1185         cfs_spin_lock_init(&imp->imp_lock);
1186         imp->imp_last_success_conn = 0;
1187         imp->imp_state = LUSTRE_IMP_NEW;
1188         imp->imp_obd = class_incref(obd, "import", imp);
1189         cfs_sema_init(&imp->imp_sec_mutex, 1);
1190         cfs_waitq_init(&imp->imp_recovery_waitq);
1191
1192         cfs_atomic_set(&imp->imp_refcount, 2);
1193         cfs_atomic_set(&imp->imp_unregistering, 0);
1194         cfs_atomic_set(&imp->imp_inflight, 0);
1195         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1196         cfs_atomic_set(&imp->imp_inval_count, 0);
1197         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1198         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1199         class_handle_hash(&imp->imp_handle, import_handle_addref);
1200         init_imp_at(&imp->imp_at);
1201
1202         /* the default magic is V2, will be used in connect RPC, and
1203          * then adjusted according to the flags in request/reply. */
1204         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1205
1206         return imp;
1207 }
1208 EXPORT_SYMBOL(class_new_import);
1209
1210 void class_destroy_import(struct obd_import *import)
1211 {
1212         LASSERT(import != NULL);
1213         LASSERT(import != LP_POISON);
1214
1215         class_handle_unhash(&import->imp_handle);
1216
1217         cfs_spin_lock(&import->imp_lock);
1218         import->imp_generation++;
1219         cfs_spin_unlock(&import->imp_lock);
1220         class_import_put(import);
1221 }
1222 EXPORT_SYMBOL(class_destroy_import);
1223
1224 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1225
1226 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1227 {
1228         cfs_spin_lock(&exp->exp_locks_list_guard);
1229
1230         LASSERT(lock->l_exp_refs_nr >= 0);
1231
1232         if (lock->l_exp_refs_target != NULL &&
1233             lock->l_exp_refs_target != exp) {
1234                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1235                               exp, lock, lock->l_exp_refs_target);
1236         }
1237         if ((lock->l_exp_refs_nr ++) == 0) {
1238                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1239                 lock->l_exp_refs_target = exp;
1240         }
1241         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1242                lock, exp, lock->l_exp_refs_nr);
1243         cfs_spin_unlock(&exp->exp_locks_list_guard);
1244 }
1245 EXPORT_SYMBOL(__class_export_add_lock_ref);
1246
1247 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1248 {
1249         cfs_spin_lock(&exp->exp_locks_list_guard);
1250         LASSERT(lock->l_exp_refs_nr > 0);
1251         if (lock->l_exp_refs_target != exp) {
1252                 LCONSOLE_WARN("lock %p, "
1253                               "mismatching export pointers: %p, %p\n",
1254                               lock, lock->l_exp_refs_target, exp);
1255         }
1256         if (-- lock->l_exp_refs_nr == 0) {
1257                 cfs_list_del_init(&lock->l_exp_refs_link);
1258                 lock->l_exp_refs_target = NULL;
1259         }
1260         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1261                lock, exp, lock->l_exp_refs_nr);
1262         cfs_spin_unlock(&exp->exp_locks_list_guard);
1263 }
1264 EXPORT_SYMBOL(__class_export_del_lock_ref);
1265 #endif
1266
1267 /* A connection defines an export context in which preallocation can
1268    be managed. This releases the export pointer reference, and returns
1269    the export handle, so the export refcount is 1 when this function
1270    returns. */
1271 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1272                   struct obd_uuid *cluuid)
1273 {
1274         struct obd_export *export;
1275         LASSERT(conn != NULL);
1276         LASSERT(obd != NULL);
1277         LASSERT(cluuid != NULL);
1278         ENTRY;
1279
1280         export = class_new_export(obd, cluuid);
1281         if (IS_ERR(export))
1282                 RETURN(PTR_ERR(export));
1283
1284         conn->cookie = export->exp_handle.h_cookie;
1285         class_export_put(export);
1286
1287         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1288                cluuid->uuid, conn->cookie);
1289         RETURN(0);
1290 }
1291 EXPORT_SYMBOL(class_connect);
1292
1293 /* if export is involved in recovery then clean up related things */
1294 void class_export_recovery_cleanup(struct obd_export *exp)
1295 {
1296         struct obd_device *obd = exp->exp_obd;
1297
1298         cfs_spin_lock(&obd->obd_recovery_task_lock);
1299         if (exp->exp_delayed)
1300                 obd->obd_delayed_clients--;
1301         if (obd->obd_recovering && exp->exp_in_recovery) {
1302                 cfs_spin_lock(&exp->exp_lock);
1303                 exp->exp_in_recovery = 0;
1304                 cfs_spin_unlock(&exp->exp_lock);
1305                 LASSERT(obd->obd_connected_clients);
1306                 obd->obd_connected_clients--;
1307         }
1308         cfs_spin_unlock(&obd->obd_recovery_task_lock);
1309         /** Cleanup req replay fields */
1310         if (exp->exp_req_replay_needed) {
1311                 cfs_spin_lock(&exp->exp_lock);
1312                 exp->exp_req_replay_needed = 0;
1313                 cfs_spin_unlock(&exp->exp_lock);
1314                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1315                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1316         }
1317         /** Cleanup lock replay data */
1318         if (exp->exp_lock_replay_needed) {
1319                 cfs_spin_lock(&exp->exp_lock);
1320                 exp->exp_lock_replay_needed = 0;
1321                 cfs_spin_unlock(&exp->exp_lock);
1322                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1323                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1324         }
1325 }
1326
1327 /* This function removes 1-3 references from the export:
1328  * 1 - for export pointer passed
1329  * and if disconnect really need
1330  * 2 - removing from hash
1331  * 3 - in client_unlink_export
1332  * The export pointer passed to this function can destroyed */
1333 int class_disconnect(struct obd_export *export)
1334 {
1335         int already_disconnected;
1336         ENTRY;
1337
1338         if (export == NULL) {
1339                 fixme();
1340                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1341                 RETURN(-EINVAL);
1342         }
1343
1344         cfs_spin_lock(&export->exp_lock);
1345         already_disconnected = export->exp_disconnected;
1346         export->exp_disconnected = 1;
1347         cfs_spin_unlock(&export->exp_lock);
1348
1349         /* class_cleanup(), abort_recovery(), and class_fail_export()
1350          * all end up in here, and if any of them race we shouldn't
1351          * call extra class_export_puts(). */
1352         if (already_disconnected) {
1353                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1354                 GOTO(no_disconn, already_disconnected);
1355         }
1356
1357         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1358                export->exp_handle.h_cookie);
1359
1360         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1361                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1362                              &export->exp_connection->c_peer.nid,
1363                              &export->exp_nid_hash);
1364
1365         class_export_recovery_cleanup(export);
1366         class_unlink_export(export);
1367 no_disconn:
1368         class_export_put(export);
1369         RETURN(0);
1370 }
1371
1372 /* Return non-zero for a fully connected export */
1373 int class_connected_export(struct obd_export *exp)
1374 {
1375         if (exp) {
1376                 int connected;
1377                 cfs_spin_lock(&exp->exp_lock);
1378                 connected = (exp->exp_conn_cnt > 0);
1379                 cfs_spin_unlock(&exp->exp_lock);
1380                 return connected;
1381         }
1382         return 0;
1383 }
1384 EXPORT_SYMBOL(class_connected_export);
1385
1386 static void class_disconnect_export_list(cfs_list_t *list,
1387                                          enum obd_option flags)
1388 {
1389         int rc;
1390         struct obd_export *exp;
1391         ENTRY;
1392
1393         /* It's possible that an export may disconnect itself, but
1394          * nothing else will be added to this list. */
1395         while (!cfs_list_empty(list)) {
1396                 exp = cfs_list_entry(list->next, struct obd_export,
1397                                      exp_obd_chain);
1398                 /* need for safe call CDEBUG after obd_disconnect */
1399                 class_export_get(exp);
1400
1401                 cfs_spin_lock(&exp->exp_lock);
1402                 exp->exp_flags = flags;
1403                 cfs_spin_unlock(&exp->exp_lock);
1404
1405                 if (obd_uuid_equals(&exp->exp_client_uuid,
1406                                     &exp->exp_obd->obd_uuid)) {
1407                         CDEBUG(D_HA,
1408                                "exp %p export uuid == obd uuid, don't discon\n",
1409                                exp);
1410                         /* Need to delete this now so we don't end up pointing
1411                          * to work_list later when this export is cleaned up. */
1412                         cfs_list_del_init(&exp->exp_obd_chain);
1413                         class_export_put(exp);
1414                         continue;
1415                 }
1416
1417                 class_export_get(exp);
1418                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1419                        "last request at "CFS_TIME_T"\n",
1420                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1421                        exp, exp->exp_last_request_time);
1422                 /* release one export reference anyway */
1423                 rc = obd_disconnect(exp);
1424
1425                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1426                        obd_export_nid2str(exp), exp, rc);
1427                 class_export_put(exp);
1428         }
1429         EXIT;
1430 }
1431
1432 void class_disconnect_exports(struct obd_device *obd)
1433 {
1434         cfs_list_t work_list;
1435         ENTRY;
1436
1437         /* Move all of the exports from obd_exports to a work list, en masse. */
1438         CFS_INIT_LIST_HEAD(&work_list);
1439         cfs_spin_lock(&obd->obd_dev_lock);
1440         cfs_list_splice_init(&obd->obd_exports, &work_list);
1441         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1442         cfs_spin_unlock(&obd->obd_dev_lock);
1443
1444         if (!cfs_list_empty(&work_list)) {
1445                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1446                        "disconnecting them\n", obd->obd_minor, obd);
1447                 class_disconnect_export_list(&work_list,
1448                                              exp_flags_from_obd(obd));
1449         } else
1450                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1451                        obd->obd_minor, obd);
1452         EXIT;
1453 }
1454 EXPORT_SYMBOL(class_disconnect_exports);
1455
1456 /* Remove exports that have not completed recovery.
1457  */
1458 void class_disconnect_stale_exports(struct obd_device *obd,
1459                                     int (*test_export)(struct obd_export *))
1460 {
1461         cfs_list_t work_list;
1462         cfs_list_t *pos, *n;
1463         struct obd_export *exp;
1464         int evicted = 0;
1465         ENTRY;
1466
1467         CFS_INIT_LIST_HEAD(&work_list);
1468         cfs_spin_lock(&obd->obd_dev_lock);
1469         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1470                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1471                 if (test_export(exp))
1472                         continue;
1473
1474                 /* don't count self-export as client */
1475                 if (obd_uuid_equals(&exp->exp_client_uuid,
1476                                     &exp->exp_obd->obd_uuid))
1477                         continue;
1478
1479                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1480                 evicted++;
1481                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1482                        obd->obd_name, exp->exp_client_uuid.uuid,
1483                        exp->exp_connection == NULL ? "<unknown>" :
1484                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1485                 print_export_data(exp, "EVICTING", 0);
1486         }
1487         cfs_spin_unlock(&obd->obd_dev_lock);
1488
1489         if (evicted) {
1490                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1491                        obd->obd_name, evicted);
1492                 obd->obd_stale_clients += evicted;
1493         }
1494         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1495                                                  OBD_OPT_ABORT_RECOV);
1496         EXIT;
1497 }
1498 EXPORT_SYMBOL(class_disconnect_stale_exports);
1499
1500 void class_fail_export(struct obd_export *exp)
1501 {
1502         int rc, already_failed;
1503
1504         cfs_spin_lock(&exp->exp_lock);
1505         already_failed = exp->exp_failed;
1506         exp->exp_failed = 1;
1507         cfs_spin_unlock(&exp->exp_lock);
1508
1509         if (already_failed) {
1510                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1511                        exp, exp->exp_client_uuid.uuid);
1512                 return;
1513         }
1514
1515         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1516                exp, exp->exp_client_uuid.uuid);
1517
1518         if (obd_dump_on_timeout)
1519                 libcfs_debug_dumplog();
1520
1521         /* Most callers into obd_disconnect are removing their own reference
1522          * (request, for example) in addition to the one from the hash table.
1523          * We don't have such a reference here, so make one. */
1524         class_export_get(exp);
1525         rc = obd_disconnect(exp);
1526         if (rc)
1527                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1528         else
1529                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1530                        exp, exp->exp_client_uuid.uuid);
1531 }
1532 EXPORT_SYMBOL(class_fail_export);
1533
1534 char *obd_export_nid2str(struct obd_export *exp)
1535 {
1536         if (exp->exp_connection != NULL)
1537                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1538
1539         return "(no nid)";
1540 }
1541 EXPORT_SYMBOL(obd_export_nid2str);
1542
1543 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1544 {
1545         struct obd_export *doomed_exp = NULL;
1546         int exports_evicted = 0;
1547
1548         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1549
1550         do {
1551                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1552                 if (doomed_exp == NULL)
1553                         break;
1554
1555                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1556                          "nid %s found, wanted nid %s, requested nid %s\n",
1557                          obd_export_nid2str(doomed_exp),
1558                          libcfs_nid2str(nid_key), nid);
1559                 LASSERTF(doomed_exp != obd->obd_self_export,
1560                          "self-export is hashed by NID?\n");
1561                 exports_evicted++;
1562                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1563                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1564                        exports_evicted);
1565                 class_fail_export(doomed_exp);
1566                 class_export_put(doomed_exp);
1567         } while (1);
1568
1569         if (!exports_evicted)
1570                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1571                        obd->obd_name, nid);
1572         return exports_evicted;
1573 }
1574 EXPORT_SYMBOL(obd_export_evict_by_nid);
1575
1576 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1577 {
1578         struct obd_export *doomed_exp = NULL;
1579         struct obd_uuid doomed_uuid;
1580         int exports_evicted = 0;
1581
1582         obd_str2uuid(&doomed_uuid, uuid);
1583         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1584                 CERROR("%s: can't evict myself\n", obd->obd_name);
1585                 return exports_evicted;
1586         }
1587
1588         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1589
1590         if (doomed_exp == NULL) {
1591                 CERROR("%s: can't disconnect %s: no exports found\n",
1592                        obd->obd_name, uuid);
1593         } else {
1594                 CWARN("%s: evicting %s at adminstrative request\n",
1595                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1596                 class_fail_export(doomed_exp);
1597                 class_export_put(doomed_exp);
1598                 exports_evicted++;
1599         }
1600
1601         return exports_evicted;
1602 }
1603 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1604
1605 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1606 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1607 EXPORT_SYMBOL(class_export_dump_hook);
1608 #endif
1609
1610 static void print_export_data(struct obd_export *exp, const char *status,
1611                               int locks)
1612 {
1613         struct ptlrpc_reply_state *rs;
1614         struct ptlrpc_reply_state *first_reply = NULL;
1615         int nreplies = 0;
1616
1617         cfs_spin_lock(&exp->exp_lock);
1618         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1619                                 rs_exp_list) {
1620                 if (nreplies == 0)
1621                         first_reply = rs;
1622                 nreplies++;
1623         }
1624         cfs_spin_unlock(&exp->exp_lock);
1625
1626         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1627                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1628                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1629                cfs_atomic_read(&exp->exp_rpc_count),
1630                cfs_atomic_read(&exp->exp_cb_count),
1631                cfs_atomic_read(&exp->exp_locks_count),
1632                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1633                nreplies, first_reply, nreplies > 3 ? "..." : "",
1634                exp->exp_last_committed);
1635 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1636         if (locks && class_export_dump_hook != NULL)
1637                 class_export_dump_hook(exp);
1638 #endif
1639 }
1640
1641 void dump_exports(struct obd_device *obd, int locks)
1642 {
1643         struct obd_export *exp;
1644
1645         cfs_spin_lock(&obd->obd_dev_lock);
1646         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1647                 print_export_data(exp, "ACTIVE", locks);
1648         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1649                 print_export_data(exp, "UNLINKED", locks);
1650         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1651                 print_export_data(exp, "DELAYED", locks);
1652         cfs_spin_unlock(&obd->obd_dev_lock);
1653         cfs_spin_lock(&obd_zombie_impexp_lock);
1654         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1655                 print_export_data(exp, "ZOMBIE", locks);
1656         cfs_spin_unlock(&obd_zombie_impexp_lock);
1657 }
1658 EXPORT_SYMBOL(dump_exports);
1659
1660 void obd_exports_barrier(struct obd_device *obd)
1661 {
1662         int waited = 2;
1663         LASSERT(cfs_list_empty(&obd->obd_exports));
1664         cfs_spin_lock(&obd->obd_dev_lock);
1665         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1666                 cfs_spin_unlock(&obd->obd_dev_lock);
1667                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1668                                                    cfs_time_seconds(waited));
1669                 if (waited > 5 && IS_PO2(waited)) {
1670                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1671                                       "more than %d seconds. "
1672                                       "The obd refcount = %d. Is it stuck?\n",
1673                                       obd->obd_name, waited,
1674                                       cfs_atomic_read(&obd->obd_refcount));
1675                         dump_exports(obd, 1);
1676                 }
1677                 waited *= 2;
1678                 cfs_spin_lock(&obd->obd_dev_lock);
1679         }
1680         cfs_spin_unlock(&obd->obd_dev_lock);
1681 }
1682 EXPORT_SYMBOL(obd_exports_barrier);
1683
1684 /**
1685  * kill zombie imports and exports
1686  */
1687 void obd_zombie_impexp_cull(void)
1688 {
1689         struct obd_import *import;
1690         struct obd_export *export;
1691         ENTRY;
1692
1693         do {
1694                 cfs_spin_lock(&obd_zombie_impexp_lock);
1695
1696                 import = NULL;
1697                 if (!cfs_list_empty(&obd_zombie_imports)) {
1698                         import = cfs_list_entry(obd_zombie_imports.next,
1699                                                 struct obd_import,
1700                                                 imp_zombie_chain);
1701                         cfs_list_del_init(&import->imp_zombie_chain);
1702                 }
1703
1704                 export = NULL;
1705                 if (!cfs_list_empty(&obd_zombie_exports)) {
1706                         export = cfs_list_entry(obd_zombie_exports.next,
1707                                                 struct obd_export,
1708                                                 exp_obd_chain);
1709                         cfs_list_del_init(&export->exp_obd_chain);
1710                 }
1711
1712                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1713
1714                 if (import != NULL)
1715                         class_import_destroy(import);
1716
1717                 if (export != NULL)
1718                         class_export_destroy(export);
1719
1720                 cfs_cond_resched();
1721         } while (import != NULL || export != NULL);
1722         EXIT;
1723 }
1724
1725 static cfs_completion_t         obd_zombie_start;
1726 static cfs_completion_t         obd_zombie_stop;
1727 static unsigned long            obd_zombie_flags;
1728 static cfs_waitq_t              obd_zombie_waitq;
1729 static pid_t                    obd_zombie_pid;
1730
1731 enum {
1732         OBD_ZOMBIE_STOP   = 1 << 1
1733 };
1734
1735 /**
1736  * check for work for kill zombie import/export thread.
1737  */
1738 static int obd_zombie_impexp_check(void *arg)
1739 {
1740         int rc;
1741
1742         cfs_spin_lock(&obd_zombie_impexp_lock);
1743         rc = cfs_list_empty(&obd_zombie_imports) &&
1744              cfs_list_empty(&obd_zombie_exports) &&
1745              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1746
1747         cfs_spin_unlock(&obd_zombie_impexp_lock);
1748
1749         RETURN(rc);
1750 }
1751
1752 /**
1753  * Add export to the obd_zombe thread and notify it.
1754  */
1755 static void obd_zombie_export_add(struct obd_export *exp) {
1756         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1757         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1758         cfs_list_del_init(&exp->exp_obd_chain);
1759         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1760         cfs_spin_lock(&obd_zombie_impexp_lock);
1761         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1762         cfs_spin_unlock(&obd_zombie_impexp_lock);
1763
1764         if (obd_zombie_impexp_notify != NULL)
1765                 obd_zombie_impexp_notify();
1766 }
1767
1768 /**
1769  * Add import to the obd_zombe thread and notify it.
1770  */
1771 static void obd_zombie_import_add(struct obd_import *imp) {
1772         LASSERT(imp->imp_sec == NULL);
1773         cfs_spin_lock(&obd_zombie_impexp_lock);
1774         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1775         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1776         cfs_spin_unlock(&obd_zombie_impexp_lock);
1777
1778         if (obd_zombie_impexp_notify != NULL)
1779                 obd_zombie_impexp_notify();
1780 }
1781
1782 /**
1783  * notify import/export destroy thread about new zombie.
1784  */
1785 static void obd_zombie_impexp_notify(void)
1786 {
1787         cfs_waitq_signal(&obd_zombie_waitq);
1788 }
1789
1790 /**
1791  * check whether obd_zombie is idle
1792  */
1793 static int obd_zombie_is_idle(void)
1794 {
1795         int rc;
1796
1797         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1798         cfs_spin_lock(&obd_zombie_impexp_lock);
1799         rc = cfs_list_empty(&obd_zombie_imports) &&
1800              cfs_list_empty(&obd_zombie_exports);
1801         cfs_spin_unlock(&obd_zombie_impexp_lock);
1802         return rc;
1803 }
1804
1805 /**
1806  * wait when obd_zombie import/export queues become empty
1807  */
1808 void obd_zombie_barrier(void)
1809 {
1810         struct l_wait_info lwi = { 0 };
1811
1812         if (obd_zombie_pid == cfs_curproc_pid())
1813                 /* don't wait for myself */
1814                 return;
1815         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1816 }
1817 EXPORT_SYMBOL(obd_zombie_barrier);
1818
1819 #ifdef __KERNEL__
1820
1821 /**
1822  * destroy zombie export/import thread.
1823  */
1824 static int obd_zombie_impexp_thread(void *unused)
1825 {
1826         int rc;
1827
1828         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1829                 cfs_complete(&obd_zombie_start);
1830                 RETURN(rc);
1831         }
1832
1833         cfs_complete(&obd_zombie_start);
1834
1835         obd_zombie_pid = cfs_curproc_pid();
1836
1837         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1838                 struct l_wait_info lwi = { 0 };
1839
1840                 l_wait_event(obd_zombie_waitq,
1841                              !obd_zombie_impexp_check(NULL), &lwi);
1842                 obd_zombie_impexp_cull();
1843
1844                 /*
1845                  * Notify obd_zombie_barrier callers that queues
1846                  * may be empty.
1847                  */
1848                 cfs_waitq_signal(&obd_zombie_waitq);
1849         }
1850
1851         cfs_complete(&obd_zombie_stop);
1852
1853         RETURN(0);
1854 }
1855
1856 #else /* ! KERNEL */
1857
1858 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1859 static void *obd_zombie_impexp_work_cb;
1860 static void *obd_zombie_impexp_idle_cb;
1861
1862 int obd_zombie_impexp_kill(void *arg)
1863 {
1864         int rc = 0;
1865
1866         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1867                 obd_zombie_impexp_cull();
1868                 rc = 1;
1869         }
1870         cfs_atomic_dec(&zombie_recur);
1871         return rc;
1872 }
1873
1874 #endif
1875
1876 /**
1877  * start destroy zombie import/export thread
1878  */
1879 int obd_zombie_impexp_init(void)
1880 {
1881         int rc;
1882
1883         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1884         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1885         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1886         cfs_init_completion(&obd_zombie_start);
1887         cfs_init_completion(&obd_zombie_stop);
1888         cfs_waitq_init(&obd_zombie_waitq);
1889         obd_zombie_pid = 0;
1890
1891 #ifdef __KERNEL__
1892         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1893         if (rc < 0)
1894                 RETURN(rc);
1895
1896         cfs_wait_for_completion(&obd_zombie_start);
1897 #else
1898
1899         obd_zombie_impexp_work_cb =
1900                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1901                                                  &obd_zombie_impexp_kill, NULL);
1902
1903         obd_zombie_impexp_idle_cb =
1904                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1905                                                  &obd_zombie_impexp_check, NULL);
1906         rc = 0;
1907 #endif
1908         RETURN(rc);
1909 }
1910 /**
1911  * stop destroy zombie import/export thread
1912  */
1913 void obd_zombie_impexp_stop(void)
1914 {
1915         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1916         obd_zombie_impexp_notify();
1917 #ifdef __KERNEL__
1918         cfs_wait_for_completion(&obd_zombie_stop);
1919 #else
1920         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1921         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1922 #endif
1923 }
1924
1925 /***** Kernel-userspace comm helpers *******/
1926
1927 /* Get length of entire message, including header */
1928 int kuc_len(int payload_len)
1929 {
1930         return sizeof(struct kuc_hdr) + payload_len;
1931 }
1932 EXPORT_SYMBOL(kuc_len);
1933
1934 /* Get a pointer to kuc header, given a ptr to the payload
1935  * @param p Pointer to payload area
1936  * @returns Pointer to kuc header
1937  */
1938 struct kuc_hdr * kuc_ptr(void *p)
1939 {
1940         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1941         LASSERT(lh->kuc_magic == KUC_MAGIC);
1942         return lh;
1943 }
1944 EXPORT_SYMBOL(kuc_ptr);
1945
1946 /* Test if payload is part of kuc message
1947  * @param p Pointer to payload area
1948  * @returns boolean
1949  */
1950 int kuc_ispayload(void *p)
1951 {
1952         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1953
1954         if (kh->kuc_magic == KUC_MAGIC)
1955                 return 1;
1956         else
1957                 return 0;
1958 }
1959 EXPORT_SYMBOL(kuc_ispayload);
1960
1961 /* Alloc space for a message, and fill in header
1962  * @return Pointer to payload area
1963  */
1964 void *kuc_alloc(int payload_len, int transport, int type)
1965 {
1966         struct kuc_hdr *lh;
1967         int len = kuc_len(payload_len);
1968
1969         OBD_ALLOC(lh, len);
1970         if (lh == NULL)
1971                 return ERR_PTR(-ENOMEM);
1972
1973         lh->kuc_magic = KUC_MAGIC;
1974         lh->kuc_transport = transport;
1975         lh->kuc_msgtype = type;
1976         lh->kuc_msglen = len;
1977
1978         return (void *)(lh + 1);
1979 }
1980 EXPORT_SYMBOL(kuc_alloc);
1981
1982 /* Takes pointer to payload area */
1983 inline void kuc_free(void *p, int payload_len)
1984 {
1985         struct kuc_hdr *lh = kuc_ptr(p);
1986         OBD_FREE(lh, kuc_len(payload_len));
1987 }
1988 EXPORT_SYMBOL(kuc_free);
1989
1990
1991