1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <libcfs/bitmap.h>
51 extern cfs_list_t obd_types;
52 cfs_spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 cfs_list_t obd_zombie_imports;
60 cfs_list_t obd_zombie_exports;
61 cfs_spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65 static void print_export_data(struct obd_export *exp,
66 const char *status, int locks);
68 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
71 cfs_spinlock_t obd_minor_lock;
73 * Maximum number of OBD devices on a single node (includes devices
74 * from all filesystems mounted on a client). This limit is itself arbitrary,
75 * though the lov_user_md_{v1,v3} structures (used for specifying the
76 * striping layout from llapi_setstripe() and on directory default EAs)
77 * have a 16-bit limit on the starting OST index.
79 const int obd_minor_map_size = 65536;
80 cfs_bitmap_t *obd_minor_map;
82 int obd_minor_alloc(void)
86 cfs_spin_lock(&obd_minor_lock);
87 ret = cfs_find_first_zero_bit(obd_minor_map->data, obd_minor_map_size);
88 if (ret != obd_minor_map_size)
89 cfs_bitmap_set(obd_minor_map, ret);
92 cfs_spin_unlock(&obd_minor_lock);
96 void obd_minor_release(long minor)
98 cfs_spin_lock(&obd_minor_lock);
99 cfs_bitmap_clear(obd_minor_map, minor);
100 cfs_spin_unlock(&obd_minor_lock);
103 int obd_minor_valid(long minor)
107 cfs_spin_lock(&obd_minor_lock);
108 ret = cfs_bitmap_check(obd_minor_map, minor);
109 cfs_spin_unlock(&obd_minor_lock);
114 static CFS_LIST_HEAD(obd_dev_list);
115 static const int obd_hash_init_bits = 10;
116 static const int obd_hash_max_bits = 30;
117 static const int obd_hash_bkt_bits = 10;
119 static cfs_hash_t *obd_name_hash = NULL;
120 static unsigned obd_name_hops_hash(cfs_hash_t *lh, const void *key,
123 return cfs_hash_djb2_hash(key, strlen(key), mask);
126 static void *obd_name_hops_obj(cfs_hlist_node_t *hn)
128 struct obd_device *obd = cfs_hlist_entry(hn, struct obd_device,
130 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
131 "%p obd_magic %08x != %08x\n",
132 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
137 static void *obd_name_hops_key(cfs_hlist_node_t *hn)
139 struct obd_device *obd = obd_name_hops_obj(hn);
141 return &obd->obd_name;
144 static int obd_name_hops_compare(const void *key, cfs_hlist_node_t *hn)
146 void *nk = obd_name_hops_key(hn);
148 return strcmp(key, nk) == 0;
151 static void obd_name_hops_noop(cfs_hash_t *hs, cfs_hlist_node_t *hn)
153 obd_name_hops_obj(hn);
156 static cfs_hash_ops_t obd_name_hops = {
157 .hs_hash = obd_name_hops_hash,
158 .hs_keycmp = obd_name_hops_compare,
159 .hs_key = obd_name_hops_key,
160 .hs_object = obd_name_hops_obj,
161 .hs_get = obd_name_hops_noop,
162 .hs_put_locked = obd_name_hops_noop,
165 static cfs_hash_t *obd_uuid_hash = NULL;
166 static unsigned obd_uuid_hops_hash(cfs_hash_t *lh, const void *key,
169 return cfs_hash_djb2_hash(key, strlen(key), mask);
172 static void *obd_uuid_hops_obj(cfs_hlist_node_t *hn)
174 struct obd_device *obd = cfs_hlist_entry(hn, struct obd_device,
176 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
177 "%p obd_magic %08x != %08x\n",
178 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
183 static void *obd_uuid_hops_key(cfs_hlist_node_t *hn)
185 struct obd_device *obd = obd_uuid_hops_obj(hn);
187 return &obd->obd_uuid;
190 static int obd_uuid_hops_compare(const void *key, cfs_hlist_node_t *hn)
192 void *nk = obd_uuid_hops_key(hn);
194 return obd_uuid_equals(key, nk);
197 static void obd_uuid_hops_noop(cfs_hash_t *hs, cfs_hlist_node_t *hn)
199 obd_uuid_hops_obj(hn);
202 static cfs_hash_ops_t obd_uuid_hops = {
203 .hs_hash = obd_uuid_hops_hash,
204 .hs_keycmp = obd_uuid_hops_compare,
205 .hs_key = obd_uuid_hops_key,
206 .hs_object = obd_uuid_hops_obj,
207 .hs_get = obd_uuid_hops_noop,
208 .hs_put_locked = obd_uuid_hops_noop,
211 static cfs_hash_t *obd_minor_hash = NULL;
212 static unsigned obd_minor_hops_hash(cfs_hash_t *lh, const void *key,
215 return cfs_hash_u32_hash(*((__u32 *)key), mask);
218 static void *obd_minor_hops_obj(cfs_hlist_node_t *hn)
220 struct obd_device *obd = cfs_hlist_entry(hn, struct obd_device,
222 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
223 "%p obd_magic %08x != %08x\n",
224 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
229 static void *obd_minor_hops_key(cfs_hlist_node_t *hn)
231 struct obd_device *obd = obd_minor_hops_obj(hn);
233 return &obd->obd_minor;
236 static int obd_minor_hops_compare(const void *key, cfs_hlist_node_t *hn)
238 __u32 *nk = obd_minor_hops_key(hn);
240 return *((__u32 *)key) == *nk;
243 static void obd_minor_hops_noop(cfs_hash_t *hs, cfs_hlist_node_t *hn)
245 obd_minor_hops_obj(hn);
248 static cfs_hash_ops_t obd_minor_hops = {
249 .hs_hash = obd_minor_hops_hash,
250 .hs_keycmp = obd_minor_hops_compare,
251 .hs_key = obd_minor_hops_key,
252 .hs_object = obd_minor_hops_obj,
253 .hs_get = obd_minor_hops_noop,
254 .hs_put_locked = obd_minor_hops_noop,
257 int obd_hashes_init(void)
259 obd_name_hash = cfs_hash_create("obd_name",
260 obd_hash_init_bits, obd_hash_max_bits,
261 obd_hash_bkt_bits, 0,
262 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
264 CFS_HASH_DEFAULT | CFS_HASH_NO_ITEMREF);
265 if (obd_name_hash == NULL)
268 obd_uuid_hash = cfs_hash_create("obd_uuid",
269 obd_hash_init_bits, obd_hash_max_bits,
270 obd_hash_bkt_bits, 0,
271 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
273 CFS_HASH_DEFAULT | CFS_HASH_NO_ITEMREF);
274 if (obd_name_hash == NULL)
277 obd_minor_hash = cfs_hash_create("obd_minor",
278 obd_hash_init_bits, obd_hash_max_bits,
279 obd_hash_bkt_bits, 0,
280 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
282 CFS_HASH_DEFAULT | CFS_HASH_NO_ITEMREF);
283 if (obd_name_hash == NULL)
286 obd_minor_map = CFS_ALLOCATE_BITMAP(obd_minor_map_size);
287 if (obd_minor_map == NULL)
290 cfs_spin_lock_init(&obd_minor_lock);
295 void obd_hashes_fini(void)
298 CFS_FREE_BITMAP(obd_minor_map);
299 cfs_hash_putref(obd_name_hash);
300 cfs_hash_putref(obd_uuid_hash);
301 cfs_hash_putref(obd_minor_hash);
305 * support functions: we could use inter-module communication, but this
306 * is more portable to other OS's
308 static struct obd_device *obd_device_alloc(void)
310 struct obd_device *obd;
312 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
314 obd->obd_magic = OBD_DEVICE_MAGIC;
319 static void obd_device_free(struct obd_device *obd)
321 LASSERT(obd != NULL);
322 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
323 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
324 if (obd->obd_namespace != NULL) {
325 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
326 obd, obd->obd_namespace, obd->obd_force);
329 lu_ref_fini(&obd->obd_reference);
330 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
333 struct obd_type *class_search_type(const char *name)
336 struct obd_type *type;
338 cfs_spin_lock(&obd_types_lock);
339 cfs_list_for_each(tmp, &obd_types) {
340 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
341 if (strcmp(type->typ_name, name) == 0) {
342 cfs_spin_unlock(&obd_types_lock);
346 cfs_spin_unlock(&obd_types_lock);
350 struct obd_type *class_get_type(const char *name)
352 struct obd_type *type = class_search_type(name);
354 #ifdef HAVE_MODULE_LOADING_SUPPORT
356 const char *modname = name;
357 if (!cfs_request_module("%s", modname)) {
358 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
359 type = class_search_type(name);
361 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
367 cfs_spin_lock(&type->obd_type_lock);
369 cfs_try_module_get(type->typ_dt_ops->o_owner);
370 cfs_spin_unlock(&type->obd_type_lock);
375 void class_put_type(struct obd_type *type)
378 cfs_spin_lock(&type->obd_type_lock);
380 cfs_module_put(type->typ_dt_ops->o_owner);
381 cfs_spin_unlock(&type->obd_type_lock);
384 #define CLASS_MAX_NAME 1024
386 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
387 struct lprocfs_vars *vars, const char *name,
388 struct lu_device_type *ldt)
390 struct obd_type *type;
395 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
397 if (class_search_type(name)) {
398 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
403 OBD_ALLOC(type, sizeof(*type));
407 OBD_ALLOC_PTR(type->typ_dt_ops);
408 OBD_ALLOC_PTR(type->typ_md_ops);
409 OBD_ALLOC(type->typ_name, strlen(name) + 1);
411 if (type->typ_dt_ops == NULL ||
412 type->typ_md_ops == NULL ||
413 type->typ_name == NULL)
416 *(type->typ_dt_ops) = *dt_ops;
417 /* md_ops is optional */
419 *(type->typ_md_ops) = *md_ops;
420 strcpy(type->typ_name, name);
421 cfs_spin_lock_init(&type->obd_type_lock);
424 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
426 if (IS_ERR(type->typ_procroot)) {
427 rc = PTR_ERR(type->typ_procroot);
428 type->typ_procroot = NULL;
434 rc = lu_device_type_init(ldt);
439 cfs_spin_lock(&obd_types_lock);
440 cfs_list_add(&type->typ_chain, &obd_types);
441 cfs_spin_unlock(&obd_types_lock);
446 if (type->typ_name != NULL)
447 OBD_FREE(type->typ_name, strlen(name) + 1);
448 if (type->typ_md_ops != NULL)
449 OBD_FREE_PTR(type->typ_md_ops);
450 if (type->typ_dt_ops != NULL)
451 OBD_FREE_PTR(type->typ_dt_ops);
452 OBD_FREE(type, sizeof(*type));
456 int class_unregister_type(const char *name)
458 struct obd_type *type = class_search_type(name);
462 CERROR("unknown obd type\n");
466 if (type->typ_refcnt) {
467 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
468 /* This is a bad situation, let's make the best of it */
469 /* Remove ops, but leave the name for debugging */
470 OBD_FREE_PTR(type->typ_dt_ops);
471 OBD_FREE_PTR(type->typ_md_ops);
475 if (type->typ_procroot) {
476 lprocfs_remove(&type->typ_procroot);
480 lu_device_type_fini(type->typ_lu);
482 cfs_spin_lock(&obd_types_lock);
483 cfs_list_del(&type->typ_chain);
484 cfs_spin_unlock(&obd_types_lock);
485 OBD_FREE(type->typ_name, strlen(name) + 1);
486 if (type->typ_dt_ops != NULL)
487 OBD_FREE_PTR(type->typ_dt_ops);
488 if (type->typ_md_ops != NULL)
489 OBD_FREE_PTR(type->typ_md_ops);
490 OBD_FREE(type, sizeof(*type));
492 } /* class_unregister_type */
494 const char *obd_dev_status(struct obd_device *obd)
498 if (obd->obd_stopping)
500 else if (obd->obd_inactive)
502 else if (obd->obd_set_up)
504 else if (obd->obd_attached)
512 #define cfs_list_entry_safe(pos, head, type, member) \
513 (pos == head ? NULL : cfs_list_entry(pos, type, member))
515 void obd_devlist_first(struct obd_device **pos)
517 struct obd_device *obd;
519 cfs_spin_lock(&obd_dev_lock);
520 obd = cfs_list_entry_safe(obd_dev_list.next, &obd_dev_list,
521 struct obd_device, obd_list);
523 class_incref(obd, "devlist", obd);
524 cfs_spin_unlock(&obd_dev_lock);
529 void obd_devlist_next(struct obd_device **pos)
531 struct obd_device *obd = NULL;
533 cfs_spin_lock(&obd_dev_lock);
534 obd = cfs_list_entry_safe((*pos)->obd_list.next, &obd_dev_list,
535 struct obd_device, obd_list);
537 class_incref(obd, "devlist", obd);
538 cfs_spin_unlock(&obd_dev_lock);
540 class_decref(*pos, "devlist", *pos);
544 void obd_devlist_last(struct obd_device *pos)
547 class_decref(pos,"devlist", pos);
552 * Create a new obd device.
554 * Find an empty slot in ::obd_devs[], create a new obd device in it.
556 * \param[in] type_name obd device type string.
557 * \param[in] name obd device name.
559 * \retval NULL if create fails, otherwise return the obd device
562 struct obd_device *class_newdev(const char *type_name, const char *name, const char *uuid)
564 struct obd_device *newdev;
565 struct obd_type *type;
568 if (strlen(name) >= MAX_OBD_NAME) {
569 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
570 RETURN(ERR_PTR(-EINVAL));
573 type = class_get_type(type_name);
575 CERROR("OBD: unknown type: %s\n", type_name);
580 newdev = obd_device_alloc();
581 if (newdev == NULL) {
586 newdev->obd_minor = obd_minor_alloc();
587 if (newdev->obd_minor < 0) {
588 CERROR("don't have free minors\n");
593 /* find add unique by name */
594 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
595 if (cfs_hash_add_unique(obd_name_hash, name, &newdev->obd_name_node)) {
596 CERROR("fails to add an unique obddev (%s) to the hash\n",
601 newdev->obd_type = type;
603 cfs_hash_add(obd_minor_hash, &newdev->obd_minor, &newdev->obd_minor_node);
604 memcpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
605 cfs_hash_add(obd_uuid_hash, uuid, &newdev->obd_uuid_node);
607 cfs_spin_lock(&obd_dev_lock);
608 cfs_list_add_tail(&newdev->obd_list, &obd_dev_list);
609 cfs_spin_unlock(&obd_dev_lock);
611 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
612 newdev->obd_name, newdev);
615 obd_minor_release(newdev->obd_minor);
617 obd_device_free(newdev);
619 class_put_type(type);
621 RETURN(ERR_PTR(ret));
624 void class_release_dev(struct obd_device *obd)
626 struct obd_type *obd_type = obd->obd_type;
628 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
629 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
630 LASSERT(obd_type != NULL);
632 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
633 obd->obd_name,obd->obd_type->typ_name);
635 cfs_hash_del(obd_name_hash, obd->obd_name, &obd->obd_name_node);
636 cfs_hash_del(obd_uuid_hash, &obd->obd_uuid, &obd->obd_uuid_node);
637 cfs_hash_del(obd_minor_hash, &obd->obd_minor, &obd->obd_minor_node);
639 cfs_spin_lock(&obd_dev_lock);
640 cfs_list_del(&obd->obd_list);
641 cfs_spin_unlock(&obd_dev_lock);
643 obd_minor_release(obd->obd_minor);
644 obd_device_free(obd);
646 class_put_type(obd_type);
649 int class_name2dev(const char *name)
651 struct obd_device *obd;
653 obd = cfs_hash_lookup(obd_name_hash, name);
654 return obd != NULL ? obd->obd_minor : -1 ;
657 struct obd_device *class_name2obd(const char *name)
659 return cfs_hash_lookup(obd_name_hash, name);
662 int class_uuid2dev(struct obd_uuid *uuid)
664 struct obd_device *obd;
666 obd = cfs_hash_lookup(obd_uuid_hash, uuid);
667 return obd != NULL ? obd->obd_minor : -1;
670 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
672 return cfs_hash_lookup(obd_uuid_hash, uuid);
676 * Get obd device from ::obd_devs[]
678 * \param num [in] array index
680 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
681 * otherwise return the obd device there.
683 struct obd_device *class_num2obd(__u32 minor)
685 struct obd_device *obd;
687 obd = cfs_hash_lookup(obd_minor_hash, &minor);
692 void class_obd_list(void)
695 struct obd_device *obd;
697 for (obd_devlist_first(&obd);
699 obd_devlist_next(&obd)) {
701 status = obd_dev_status(obd);
702 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
703 obd->obd_minor, status, obd->obd_type->typ_name,
704 obd->obd_name, obd->obd_uuid.uuid,
705 cfs_atomic_read(&obd->obd_refcount));
710 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
711 specified, then only the client with that uuid is returned,
712 otherwise any client connected to the tgt is returned. */
713 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
714 const char * typ_name,
715 struct obd_uuid *grp_uuid)
717 struct obd_device *obd;
719 for (obd_devlist_first(&obd);
721 obd_devlist_next(&obd)) {
722 /* XXX per type list ? */
723 if ((strncmp(obd->obd_type->typ_name, typ_name,
724 strlen(typ_name)) == 0)) {
725 if (obd_uuid_equals(tgt_uuid,
726 &obd->u.cli.cl_target_uuid) &&
727 ((grp_uuid)? obd_uuid_equals(grp_uuid,
728 &obd->obd_uuid) : 1)) {
729 obd_devlist_last(obd);
738 /* Iterate the obd_device list looking devices have grp_uuid. Start
739 searching at *next, and if a device is found, the next index to look
740 at is saved in *next. If next is NULL, then the first matching device
741 will always be returned. */
742 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid,
743 struct obd_device **prev)
745 struct obd_device *obd = *prev;
748 obd_devlist_first(&obd);
750 obd_devlist_next(&obd);
753 for (; obd != NULL; obd_devlist_next(&obd)) {
754 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
755 /* XXX return with reference */
765 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
766 * adjust sptlrpc settings accordingly.
768 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
770 struct obd_device *obd;
774 LASSERT(namelen > 0);
776 for (obd_devlist_first(&obd);
778 obd_devlist_next(&obd)) {
779 if (obd->obd_set_up == 0 || obd->obd_stopping)
782 /* only notify mdc, osc, mdt, ost */
783 type = obd->obd_type->typ_name;
784 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
785 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
786 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
787 strcmp(type, LUSTRE_OST_NAME) != 0)
790 if (strncmp(obd->obd_name, fsname, namelen))
793 /** XXX - some new obd can be added at that point */
794 rc2 = obd_set_info_async(obd->obd_self_export,
795 sizeof(KEY_SPTLRPC_CONF),
796 KEY_SPTLRPC_CONF, 0, NULL, NULL);
802 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
804 void obd_cleanup_caches(void)
809 if (obd_device_cachep) {
810 rc = cfs_mem_cache_destroy(obd_device_cachep);
811 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
812 obd_device_cachep = NULL;
815 rc = cfs_mem_cache_destroy(obdo_cachep);
816 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
820 rc = cfs_mem_cache_destroy(import_cachep);
821 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
822 import_cachep = NULL;
825 rc = cfs_mem_cache_destroy(capa_cachep);
826 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
832 int obd_init_caches(void)
836 LASSERT(obd_device_cachep == NULL);
837 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
838 sizeof(struct obd_device),
840 if (!obd_device_cachep)
843 LASSERT(obdo_cachep == NULL);
844 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
849 LASSERT(import_cachep == NULL);
850 import_cachep = cfs_mem_cache_create("ll_import_cache",
851 sizeof(struct obd_import),
856 LASSERT(capa_cachep == NULL);
857 capa_cachep = cfs_mem_cache_create("capa_cache",
858 sizeof(struct obd_capa), 0, 0);
864 obd_cleanup_caches();
869 /* map connection to client */
870 struct obd_export *class_conn2export(struct lustre_handle *conn)
872 struct obd_export *export;
876 CDEBUG(D_CACHE, "looking for null handle\n");
880 if (conn->cookie == -1) { /* this means assign a new connection */
881 CDEBUG(D_CACHE, "want a new connection\n");
885 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
886 export = class_handle2object(conn->cookie);
890 struct obd_device *class_exp2obd(struct obd_export *exp)
897 struct obd_device *class_conn2obd(struct lustre_handle *conn)
899 struct obd_export *export;
900 export = class_conn2export(conn);
902 struct obd_device *obd = export->exp_obd;
903 class_export_put(export);
909 struct obd_import *class_exp2cliimp(struct obd_export *exp)
911 struct obd_device *obd = exp->exp_obd;
914 return obd->u.cli.cl_import;
917 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
919 struct obd_device *obd = class_conn2obd(conn);
922 return obd->u.cli.cl_import;
925 /* Export management functions */
926 static void class_export_destroy(struct obd_export *exp)
928 struct obd_device *obd = exp->exp_obd;
931 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
933 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
934 exp->exp_client_uuid.uuid, obd->obd_name);
936 LASSERT(obd != NULL);
938 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
939 if (exp->exp_connection)
940 ptlrpc_put_connection_superhack(exp->exp_connection);
942 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
943 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
944 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
945 LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
946 obd_destroy_export(exp);
947 class_decref(obd, "export", exp);
949 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
953 static void export_handle_addref(void *export)
955 class_export_get(export);
958 struct obd_export *class_export_get(struct obd_export *exp)
960 cfs_atomic_inc(&exp->exp_refcount);
961 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
962 cfs_atomic_read(&exp->exp_refcount));
965 EXPORT_SYMBOL(class_export_get);
967 void class_export_put(struct obd_export *exp)
969 LASSERT(exp != NULL);
970 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, 0x5a5a5a);
971 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
972 cfs_atomic_read(&exp->exp_refcount) - 1);
974 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
975 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
976 CDEBUG(D_IOCTL, "final put %p/%s\n",
977 exp, exp->exp_client_uuid.uuid);
979 /* release nid stat refererence */
980 lprocfs_exp_cleanup(exp);
982 obd_zombie_export_add(exp);
985 EXPORT_SYMBOL(class_export_put);
987 /* Creates a new export, adds it to the hash table, and returns a
988 * pointer to it. The refcount is 2: one for the hash reference, and
989 * one for the pointer returned by this function. */
990 struct obd_export *class_new_export(struct obd_device *obd,
991 struct obd_uuid *cluuid)
993 struct obd_export *export;
994 cfs_hash_t *hash = NULL;
998 OBD_ALLOC_PTR(export);
1000 return ERR_PTR(-ENOMEM);
1002 export->exp_conn_cnt = 0;
1003 export->exp_lock_hash = NULL;
1004 cfs_atomic_set(&export->exp_refcount, 2);
1005 cfs_atomic_set(&export->exp_rpc_count, 0);
1006 cfs_atomic_set(&export->exp_cb_count, 0);
1007 cfs_atomic_set(&export->exp_locks_count, 0);
1008 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1009 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
1010 cfs_spin_lock_init(&export->exp_locks_list_guard);
1012 cfs_atomic_set(&export->exp_replay_count, 0);
1013 export->exp_obd = obd;
1014 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
1015 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
1016 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1017 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
1018 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
1019 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
1020 class_handle_hash(&export->exp_handle, export_handle_addref);
1021 export->exp_last_request_time = cfs_time_current_sec();
1022 cfs_spin_lock_init(&export->exp_lock);
1023 cfs_spin_lock_init(&export->exp_rpc_lock);
1024 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
1025 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
1027 export->exp_sp_peer = LUSTRE_SP_ANY;
1028 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1029 export->exp_client_uuid = *cluuid;
1030 obd_init_export(export);
1032 cfs_spin_lock(&obd->obd_dev_lock);
1033 /* shouldn't happen, but might race */
1034 if (obd->obd_stopping)
1035 GOTO(exit_unlock, rc = -ENODEV);
1037 hash = cfs_hash_getref(obd->obd_uuid_hash);
1039 GOTO(exit_unlock, rc = -ENODEV);
1040 cfs_spin_unlock(&obd->obd_dev_lock);
1042 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1043 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1045 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1046 obd->obd_name, cluuid->uuid, rc);
1047 GOTO(exit_err, rc = -EALREADY);
1051 cfs_spin_lock(&obd->obd_dev_lock);
1052 if (obd->obd_stopping) {
1053 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1054 GOTO(exit_unlock, rc = -ENODEV);
1057 class_incref(obd, "export", export);
1058 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
1059 cfs_list_add_tail(&export->exp_obd_chain_timed,
1060 &export->exp_obd->obd_exports_timed);
1061 export->exp_obd->obd_num_exports++;
1062 cfs_spin_unlock(&obd->obd_dev_lock);
1063 cfs_hash_putref(hash);
1067 cfs_spin_unlock(&obd->obd_dev_lock);
1070 cfs_hash_putref(hash);
1071 class_handle_unhash(&export->exp_handle);
1072 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
1073 obd_destroy_export(export);
1074 OBD_FREE_PTR(export);
1077 EXPORT_SYMBOL(class_new_export);
1079 void class_unlink_export(struct obd_export *exp)
1081 class_handle_unhash(&exp->exp_handle);
1083 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1084 /* delete an uuid-export hashitem from hashtables */
1085 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
1086 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1087 &exp->exp_client_uuid,
1088 &exp->exp_uuid_hash);
1090 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1091 cfs_list_del_init(&exp->exp_obd_chain_timed);
1092 exp->exp_obd->obd_num_exports--;
1093 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1094 class_export_put(exp);
1096 EXPORT_SYMBOL(class_unlink_export);
1098 /* Import management functions */
1099 void class_import_destroy(struct obd_import *imp)
1103 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1104 imp->imp_obd->obd_name);
1106 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1108 ptlrpc_put_connection_superhack(imp->imp_connection);
1110 while (!cfs_list_empty(&imp->imp_conn_list)) {
1111 struct obd_import_conn *imp_conn;
1113 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
1114 struct obd_import_conn, oic_item);
1115 cfs_list_del_init(&imp_conn->oic_item);
1116 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1117 OBD_FREE(imp_conn, sizeof(*imp_conn));
1120 LASSERT(imp->imp_sec == NULL);
1121 class_decref(imp->imp_obd, "import", imp);
1122 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1126 static void import_handle_addref(void *import)
1128 class_import_get(import);
1131 struct obd_import *class_import_get(struct obd_import *import)
1133 cfs_atomic_inc(&import->imp_refcount);
1134 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1135 cfs_atomic_read(&import->imp_refcount),
1136 import->imp_obd->obd_name);
1139 EXPORT_SYMBOL(class_import_get);
1141 void class_import_put(struct obd_import *imp)
1145 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1146 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, 0x5a5a5a);
1148 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1149 cfs_atomic_read(&imp->imp_refcount) - 1,
1150 imp->imp_obd->obd_name);
1152 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1153 CDEBUG(D_INFO, "final put import %p\n", imp);
1154 obd_zombie_import_add(imp);
1159 EXPORT_SYMBOL(class_import_put);
1161 static void init_imp_at(struct imp_at *at) {
1163 at_init(&at->iat_net_latency, 0, 0);
1164 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1165 /* max service estimates are tracked on the server side, so
1166 don't use the AT history here, just use the last reported
1167 val. (But keep hist for proc histogram, worst_ever) */
1168 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1173 struct obd_import *class_new_import(struct obd_device *obd)
1175 struct obd_import *imp;
1177 OBD_ALLOC(imp, sizeof(*imp));
1181 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1182 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1183 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1184 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1185 cfs_spin_lock_init(&imp->imp_lock);
1186 imp->imp_last_success_conn = 0;
1187 imp->imp_state = LUSTRE_IMP_NEW;
1188 imp->imp_obd = class_incref(obd, "import", imp);
1189 cfs_sema_init(&imp->imp_sec_mutex, 1);
1190 cfs_waitq_init(&imp->imp_recovery_waitq);
1192 cfs_atomic_set(&imp->imp_refcount, 2);
1193 cfs_atomic_set(&imp->imp_unregistering, 0);
1194 cfs_atomic_set(&imp->imp_inflight, 0);
1195 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1196 cfs_atomic_set(&imp->imp_inval_count, 0);
1197 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1198 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1199 class_handle_hash(&imp->imp_handle, import_handle_addref);
1200 init_imp_at(&imp->imp_at);
1202 /* the default magic is V2, will be used in connect RPC, and
1203 * then adjusted according to the flags in request/reply. */
1204 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1208 EXPORT_SYMBOL(class_new_import);
1210 void class_destroy_import(struct obd_import *import)
1212 LASSERT(import != NULL);
1213 LASSERT(import != LP_POISON);
1215 class_handle_unhash(&import->imp_handle);
1217 cfs_spin_lock(&import->imp_lock);
1218 import->imp_generation++;
1219 cfs_spin_unlock(&import->imp_lock);
1220 class_import_put(import);
1222 EXPORT_SYMBOL(class_destroy_import);
1224 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1226 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1228 cfs_spin_lock(&exp->exp_locks_list_guard);
1230 LASSERT(lock->l_exp_refs_nr >= 0);
1232 if (lock->l_exp_refs_target != NULL &&
1233 lock->l_exp_refs_target != exp) {
1234 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1235 exp, lock, lock->l_exp_refs_target);
1237 if ((lock->l_exp_refs_nr ++) == 0) {
1238 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1239 lock->l_exp_refs_target = exp;
1241 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1242 lock, exp, lock->l_exp_refs_nr);
1243 cfs_spin_unlock(&exp->exp_locks_list_guard);
1245 EXPORT_SYMBOL(__class_export_add_lock_ref);
1247 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1249 cfs_spin_lock(&exp->exp_locks_list_guard);
1250 LASSERT(lock->l_exp_refs_nr > 0);
1251 if (lock->l_exp_refs_target != exp) {
1252 LCONSOLE_WARN("lock %p, "
1253 "mismatching export pointers: %p, %p\n",
1254 lock, lock->l_exp_refs_target, exp);
1256 if (-- lock->l_exp_refs_nr == 0) {
1257 cfs_list_del_init(&lock->l_exp_refs_link);
1258 lock->l_exp_refs_target = NULL;
1260 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1261 lock, exp, lock->l_exp_refs_nr);
1262 cfs_spin_unlock(&exp->exp_locks_list_guard);
1264 EXPORT_SYMBOL(__class_export_del_lock_ref);
1267 /* A connection defines an export context in which preallocation can
1268 be managed. This releases the export pointer reference, and returns
1269 the export handle, so the export refcount is 1 when this function
1271 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1272 struct obd_uuid *cluuid)
1274 struct obd_export *export;
1275 LASSERT(conn != NULL);
1276 LASSERT(obd != NULL);
1277 LASSERT(cluuid != NULL);
1280 export = class_new_export(obd, cluuid);
1282 RETURN(PTR_ERR(export));
1284 conn->cookie = export->exp_handle.h_cookie;
1285 class_export_put(export);
1287 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1288 cluuid->uuid, conn->cookie);
1291 EXPORT_SYMBOL(class_connect);
1293 /* if export is involved in recovery then clean up related things */
1294 void class_export_recovery_cleanup(struct obd_export *exp)
1296 struct obd_device *obd = exp->exp_obd;
1298 cfs_spin_lock(&obd->obd_recovery_task_lock);
1299 if (exp->exp_delayed)
1300 obd->obd_delayed_clients--;
1301 if (obd->obd_recovering && exp->exp_in_recovery) {
1302 cfs_spin_lock(&exp->exp_lock);
1303 exp->exp_in_recovery = 0;
1304 cfs_spin_unlock(&exp->exp_lock);
1305 LASSERT(obd->obd_connected_clients);
1306 obd->obd_connected_clients--;
1308 cfs_spin_unlock(&obd->obd_recovery_task_lock);
1309 /** Cleanup req replay fields */
1310 if (exp->exp_req_replay_needed) {
1311 cfs_spin_lock(&exp->exp_lock);
1312 exp->exp_req_replay_needed = 0;
1313 cfs_spin_unlock(&exp->exp_lock);
1314 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1315 cfs_atomic_dec(&obd->obd_req_replay_clients);
1317 /** Cleanup lock replay data */
1318 if (exp->exp_lock_replay_needed) {
1319 cfs_spin_lock(&exp->exp_lock);
1320 exp->exp_lock_replay_needed = 0;
1321 cfs_spin_unlock(&exp->exp_lock);
1322 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1323 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1327 /* This function removes 1-3 references from the export:
1328 * 1 - for export pointer passed
1329 * and if disconnect really need
1330 * 2 - removing from hash
1331 * 3 - in client_unlink_export
1332 * The export pointer passed to this function can destroyed */
1333 int class_disconnect(struct obd_export *export)
1335 int already_disconnected;
1338 if (export == NULL) {
1340 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1344 cfs_spin_lock(&export->exp_lock);
1345 already_disconnected = export->exp_disconnected;
1346 export->exp_disconnected = 1;
1347 cfs_spin_unlock(&export->exp_lock);
1349 /* class_cleanup(), abort_recovery(), and class_fail_export()
1350 * all end up in here, and if any of them race we shouldn't
1351 * call extra class_export_puts(). */
1352 if (already_disconnected) {
1353 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1354 GOTO(no_disconn, already_disconnected);
1357 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1358 export->exp_handle.h_cookie);
1360 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1361 cfs_hash_del(export->exp_obd->obd_nid_hash,
1362 &export->exp_connection->c_peer.nid,
1363 &export->exp_nid_hash);
1365 class_export_recovery_cleanup(export);
1366 class_unlink_export(export);
1368 class_export_put(export);
1372 /* Return non-zero for a fully connected export */
1373 int class_connected_export(struct obd_export *exp)
1377 cfs_spin_lock(&exp->exp_lock);
1378 connected = (exp->exp_conn_cnt > 0);
1379 cfs_spin_unlock(&exp->exp_lock);
1384 EXPORT_SYMBOL(class_connected_export);
1386 static void class_disconnect_export_list(cfs_list_t *list,
1387 enum obd_option flags)
1390 struct obd_export *exp;
1393 /* It's possible that an export may disconnect itself, but
1394 * nothing else will be added to this list. */
1395 while (!cfs_list_empty(list)) {
1396 exp = cfs_list_entry(list->next, struct obd_export,
1398 /* need for safe call CDEBUG after obd_disconnect */
1399 class_export_get(exp);
1401 cfs_spin_lock(&exp->exp_lock);
1402 exp->exp_flags = flags;
1403 cfs_spin_unlock(&exp->exp_lock);
1405 if (obd_uuid_equals(&exp->exp_client_uuid,
1406 &exp->exp_obd->obd_uuid)) {
1408 "exp %p export uuid == obd uuid, don't discon\n",
1410 /* Need to delete this now so we don't end up pointing
1411 * to work_list later when this export is cleaned up. */
1412 cfs_list_del_init(&exp->exp_obd_chain);
1413 class_export_put(exp);
1417 class_export_get(exp);
1418 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1419 "last request at "CFS_TIME_T"\n",
1420 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1421 exp, exp->exp_last_request_time);
1422 /* release one export reference anyway */
1423 rc = obd_disconnect(exp);
1425 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1426 obd_export_nid2str(exp), exp, rc);
1427 class_export_put(exp);
1432 void class_disconnect_exports(struct obd_device *obd)
1434 cfs_list_t work_list;
1437 /* Move all of the exports from obd_exports to a work list, en masse. */
1438 CFS_INIT_LIST_HEAD(&work_list);
1439 cfs_spin_lock(&obd->obd_dev_lock);
1440 cfs_list_splice_init(&obd->obd_exports, &work_list);
1441 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1442 cfs_spin_unlock(&obd->obd_dev_lock);
1444 if (!cfs_list_empty(&work_list)) {
1445 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1446 "disconnecting them\n", obd->obd_minor, obd);
1447 class_disconnect_export_list(&work_list,
1448 exp_flags_from_obd(obd));
1450 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1451 obd->obd_minor, obd);
1454 EXPORT_SYMBOL(class_disconnect_exports);
1456 /* Remove exports that have not completed recovery.
1458 void class_disconnect_stale_exports(struct obd_device *obd,
1459 int (*test_export)(struct obd_export *))
1461 cfs_list_t work_list;
1462 cfs_list_t *pos, *n;
1463 struct obd_export *exp;
1467 CFS_INIT_LIST_HEAD(&work_list);
1468 cfs_spin_lock(&obd->obd_dev_lock);
1469 cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1470 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1471 if (test_export(exp))
1474 /* don't count self-export as client */
1475 if (obd_uuid_equals(&exp->exp_client_uuid,
1476 &exp->exp_obd->obd_uuid))
1479 cfs_list_move(&exp->exp_obd_chain, &work_list);
1481 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1482 obd->obd_name, exp->exp_client_uuid.uuid,
1483 exp->exp_connection == NULL ? "<unknown>" :
1484 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1485 print_export_data(exp, "EVICTING", 0);
1487 cfs_spin_unlock(&obd->obd_dev_lock);
1490 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1491 obd->obd_name, evicted);
1492 obd->obd_stale_clients += evicted;
1494 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1495 OBD_OPT_ABORT_RECOV);
1498 EXPORT_SYMBOL(class_disconnect_stale_exports);
1500 void class_fail_export(struct obd_export *exp)
1502 int rc, already_failed;
1504 cfs_spin_lock(&exp->exp_lock);
1505 already_failed = exp->exp_failed;
1506 exp->exp_failed = 1;
1507 cfs_spin_unlock(&exp->exp_lock);
1509 if (already_failed) {
1510 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1511 exp, exp->exp_client_uuid.uuid);
1515 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1516 exp, exp->exp_client_uuid.uuid);
1518 if (obd_dump_on_timeout)
1519 libcfs_debug_dumplog();
1521 /* Most callers into obd_disconnect are removing their own reference
1522 * (request, for example) in addition to the one from the hash table.
1523 * We don't have such a reference here, so make one. */
1524 class_export_get(exp);
1525 rc = obd_disconnect(exp);
1527 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1529 CDEBUG(D_HA, "disconnected export %p/%s\n",
1530 exp, exp->exp_client_uuid.uuid);
1532 EXPORT_SYMBOL(class_fail_export);
1534 char *obd_export_nid2str(struct obd_export *exp)
1536 if (exp->exp_connection != NULL)
1537 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1541 EXPORT_SYMBOL(obd_export_nid2str);
1543 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1545 struct obd_export *doomed_exp = NULL;
1546 int exports_evicted = 0;
1548 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1551 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1552 if (doomed_exp == NULL)
1555 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1556 "nid %s found, wanted nid %s, requested nid %s\n",
1557 obd_export_nid2str(doomed_exp),
1558 libcfs_nid2str(nid_key), nid);
1559 LASSERTF(doomed_exp != obd->obd_self_export,
1560 "self-export is hashed by NID?\n");
1562 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1563 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1565 class_fail_export(doomed_exp);
1566 class_export_put(doomed_exp);
1569 if (!exports_evicted)
1570 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1571 obd->obd_name, nid);
1572 return exports_evicted;
1574 EXPORT_SYMBOL(obd_export_evict_by_nid);
1576 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1578 struct obd_export *doomed_exp = NULL;
1579 struct obd_uuid doomed_uuid;
1580 int exports_evicted = 0;
1582 obd_str2uuid(&doomed_uuid, uuid);
1583 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1584 CERROR("%s: can't evict myself\n", obd->obd_name);
1585 return exports_evicted;
1588 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1590 if (doomed_exp == NULL) {
1591 CERROR("%s: can't disconnect %s: no exports found\n",
1592 obd->obd_name, uuid);
1594 CWARN("%s: evicting %s at adminstrative request\n",
1595 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1596 class_fail_export(doomed_exp);
1597 class_export_put(doomed_exp);
1601 return exports_evicted;
1603 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1605 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1606 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1607 EXPORT_SYMBOL(class_export_dump_hook);
1610 static void print_export_data(struct obd_export *exp, const char *status,
1613 struct ptlrpc_reply_state *rs;
1614 struct ptlrpc_reply_state *first_reply = NULL;
1617 cfs_spin_lock(&exp->exp_lock);
1618 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1624 cfs_spin_unlock(&exp->exp_lock);
1626 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1627 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1628 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1629 cfs_atomic_read(&exp->exp_rpc_count),
1630 cfs_atomic_read(&exp->exp_cb_count),
1631 cfs_atomic_read(&exp->exp_locks_count),
1632 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1633 nreplies, first_reply, nreplies > 3 ? "..." : "",
1634 exp->exp_last_committed);
1635 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1636 if (locks && class_export_dump_hook != NULL)
1637 class_export_dump_hook(exp);
1641 void dump_exports(struct obd_device *obd, int locks)
1643 struct obd_export *exp;
1645 cfs_spin_lock(&obd->obd_dev_lock);
1646 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1647 print_export_data(exp, "ACTIVE", locks);
1648 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1649 print_export_data(exp, "UNLINKED", locks);
1650 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1651 print_export_data(exp, "DELAYED", locks);
1652 cfs_spin_unlock(&obd->obd_dev_lock);
1653 cfs_spin_lock(&obd_zombie_impexp_lock);
1654 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1655 print_export_data(exp, "ZOMBIE", locks);
1656 cfs_spin_unlock(&obd_zombie_impexp_lock);
1658 EXPORT_SYMBOL(dump_exports);
1660 void obd_exports_barrier(struct obd_device *obd)
1663 LASSERT(cfs_list_empty(&obd->obd_exports));
1664 cfs_spin_lock(&obd->obd_dev_lock);
1665 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1666 cfs_spin_unlock(&obd->obd_dev_lock);
1667 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1668 cfs_time_seconds(waited));
1669 if (waited > 5 && IS_PO2(waited)) {
1670 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1671 "more than %d seconds. "
1672 "The obd refcount = %d. Is it stuck?\n",
1673 obd->obd_name, waited,
1674 cfs_atomic_read(&obd->obd_refcount));
1675 dump_exports(obd, 1);
1678 cfs_spin_lock(&obd->obd_dev_lock);
1680 cfs_spin_unlock(&obd->obd_dev_lock);
1682 EXPORT_SYMBOL(obd_exports_barrier);
1685 * kill zombie imports and exports
1687 void obd_zombie_impexp_cull(void)
1689 struct obd_import *import;
1690 struct obd_export *export;
1694 cfs_spin_lock(&obd_zombie_impexp_lock);
1697 if (!cfs_list_empty(&obd_zombie_imports)) {
1698 import = cfs_list_entry(obd_zombie_imports.next,
1701 cfs_list_del_init(&import->imp_zombie_chain);
1705 if (!cfs_list_empty(&obd_zombie_exports)) {
1706 export = cfs_list_entry(obd_zombie_exports.next,
1709 cfs_list_del_init(&export->exp_obd_chain);
1712 cfs_spin_unlock(&obd_zombie_impexp_lock);
1715 class_import_destroy(import);
1718 class_export_destroy(export);
1721 } while (import != NULL || export != NULL);
1725 static cfs_completion_t obd_zombie_start;
1726 static cfs_completion_t obd_zombie_stop;
1727 static unsigned long obd_zombie_flags;
1728 static cfs_waitq_t obd_zombie_waitq;
1729 static pid_t obd_zombie_pid;
1732 OBD_ZOMBIE_STOP = 1 << 1
1736 * check for work for kill zombie import/export thread.
1738 static int obd_zombie_impexp_check(void *arg)
1742 cfs_spin_lock(&obd_zombie_impexp_lock);
1743 rc = cfs_list_empty(&obd_zombie_imports) &&
1744 cfs_list_empty(&obd_zombie_exports) &&
1745 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1747 cfs_spin_unlock(&obd_zombie_impexp_lock);
1753 * Add export to the obd_zombe thread and notify it.
1755 static void obd_zombie_export_add(struct obd_export *exp) {
1756 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1757 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1758 cfs_list_del_init(&exp->exp_obd_chain);
1759 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1760 cfs_spin_lock(&obd_zombie_impexp_lock);
1761 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1762 cfs_spin_unlock(&obd_zombie_impexp_lock);
1764 if (obd_zombie_impexp_notify != NULL)
1765 obd_zombie_impexp_notify();
1769 * Add import to the obd_zombe thread and notify it.
1771 static void obd_zombie_import_add(struct obd_import *imp) {
1772 LASSERT(imp->imp_sec == NULL);
1773 cfs_spin_lock(&obd_zombie_impexp_lock);
1774 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1775 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1776 cfs_spin_unlock(&obd_zombie_impexp_lock);
1778 if (obd_zombie_impexp_notify != NULL)
1779 obd_zombie_impexp_notify();
1783 * notify import/export destroy thread about new zombie.
1785 static void obd_zombie_impexp_notify(void)
1787 cfs_waitq_signal(&obd_zombie_waitq);
1791 * check whether obd_zombie is idle
1793 static int obd_zombie_is_idle(void)
1797 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1798 cfs_spin_lock(&obd_zombie_impexp_lock);
1799 rc = cfs_list_empty(&obd_zombie_imports) &&
1800 cfs_list_empty(&obd_zombie_exports);
1801 cfs_spin_unlock(&obd_zombie_impexp_lock);
1806 * wait when obd_zombie import/export queues become empty
1808 void obd_zombie_barrier(void)
1810 struct l_wait_info lwi = { 0 };
1812 if (obd_zombie_pid == cfs_curproc_pid())
1813 /* don't wait for myself */
1815 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1817 EXPORT_SYMBOL(obd_zombie_barrier);
1822 * destroy zombie export/import thread.
1824 static int obd_zombie_impexp_thread(void *unused)
1828 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1829 cfs_complete(&obd_zombie_start);
1833 cfs_complete(&obd_zombie_start);
1835 obd_zombie_pid = cfs_curproc_pid();
1837 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1838 struct l_wait_info lwi = { 0 };
1840 l_wait_event(obd_zombie_waitq,
1841 !obd_zombie_impexp_check(NULL), &lwi);
1842 obd_zombie_impexp_cull();
1845 * Notify obd_zombie_barrier callers that queues
1848 cfs_waitq_signal(&obd_zombie_waitq);
1851 cfs_complete(&obd_zombie_stop);
1856 #else /* ! KERNEL */
1858 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1859 static void *obd_zombie_impexp_work_cb;
1860 static void *obd_zombie_impexp_idle_cb;
1862 int obd_zombie_impexp_kill(void *arg)
1866 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1867 obd_zombie_impexp_cull();
1870 cfs_atomic_dec(&zombie_recur);
1877 * start destroy zombie import/export thread
1879 int obd_zombie_impexp_init(void)
1883 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1884 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1885 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1886 cfs_init_completion(&obd_zombie_start);
1887 cfs_init_completion(&obd_zombie_stop);
1888 cfs_waitq_init(&obd_zombie_waitq);
1892 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1896 cfs_wait_for_completion(&obd_zombie_start);
1899 obd_zombie_impexp_work_cb =
1900 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1901 &obd_zombie_impexp_kill, NULL);
1903 obd_zombie_impexp_idle_cb =
1904 liblustre_register_idle_callback("obd_zombi_impexp_check",
1905 &obd_zombie_impexp_check, NULL);
1911 * stop destroy zombie import/export thread
1913 void obd_zombie_impexp_stop(void)
1915 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1916 obd_zombie_impexp_notify();
1918 cfs_wait_for_completion(&obd_zombie_stop);
1920 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1921 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1925 /***** Kernel-userspace comm helpers *******/
1927 /* Get length of entire message, including header */
1928 int kuc_len(int payload_len)
1930 return sizeof(struct kuc_hdr) + payload_len;
1932 EXPORT_SYMBOL(kuc_len);
1934 /* Get a pointer to kuc header, given a ptr to the payload
1935 * @param p Pointer to payload area
1936 * @returns Pointer to kuc header
1938 struct kuc_hdr * kuc_ptr(void *p)
1940 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1941 LASSERT(lh->kuc_magic == KUC_MAGIC);
1944 EXPORT_SYMBOL(kuc_ptr);
1946 /* Test if payload is part of kuc message
1947 * @param p Pointer to payload area
1950 int kuc_ispayload(void *p)
1952 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1954 if (kh->kuc_magic == KUC_MAGIC)
1959 EXPORT_SYMBOL(kuc_ispayload);
1961 /* Alloc space for a message, and fill in header
1962 * @return Pointer to payload area
1964 void *kuc_alloc(int payload_len, int transport, int type)
1967 int len = kuc_len(payload_len);
1971 return ERR_PTR(-ENOMEM);
1973 lh->kuc_magic = KUC_MAGIC;
1974 lh->kuc_transport = transport;
1975 lh->kuc_msgtype = type;
1976 lh->kuc_msglen = len;
1978 return (void *)(lh + 1);
1980 EXPORT_SYMBOL(kuc_alloc);
1982 /* Takes pointer to payload area */
1983 inline void kuc_free(void *p, int payload_len)
1985 struct kuc_hdr *lh = kuc_ptr(p);
1986 OBD_FREE(lh, kuc_len(payload_len));
1988 EXPORT_SYMBOL(kuc_free);