1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lustre/obdclass/genops.c
41 * These are the only exported functions, they provide some generic
42 * infrastructure for managing object devices
45 #define DEBUG_SUBSYSTEM S_CLASS
47 #include <liblustre.h>
50 #include <obd_class.h>
51 #include <lprocfs_status.h>
53 extern cfs_list_t obd_types;
54 cfs_spinlock_t obd_types_lock;
56 cfs_mem_cache_t *obd_device_cachep;
57 cfs_mem_cache_t *obdo_cachep;
58 EXPORT_SYMBOL(obdo_cachep);
59 cfs_mem_cache_t *import_cachep;
61 cfs_list_t obd_zombie_imports;
62 cfs_list_t obd_zombie_exports;
63 cfs_spinlock_t obd_zombie_impexp_lock;
64 static void obd_zombie_impexp_notify(void);
65 static void obd_zombie_export_add(struct obd_export *exp);
66 static void obd_zombie_import_add(struct obd_import *imp);
67 static void print_export_data(struct obd_export *exp,
68 const char *status, int locks);
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
73 * support functions: we could use inter-module communication, but this
74 * is more portable to other OS's
76 static struct obd_device *obd_device_alloc(void)
78 struct obd_device *obd;
80 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
82 obd->obd_magic = OBD_DEVICE_MAGIC;
87 static void obd_device_free(struct obd_device *obd)
90 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
91 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
92 if (obd->obd_namespace != NULL) {
93 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
94 obd, obd->obd_namespace, obd->obd_force);
97 lu_ref_fini(&obd->obd_reference);
98 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
101 struct obd_type *class_search_type(const char *name)
104 struct obd_type *type;
106 cfs_spin_lock(&obd_types_lock);
107 cfs_list_for_each(tmp, &obd_types) {
108 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
109 if (strcmp(type->typ_name, name) == 0) {
110 cfs_spin_unlock(&obd_types_lock);
114 cfs_spin_unlock(&obd_types_lock);
118 struct obd_type *class_get_type(const char *name)
120 struct obd_type *type = class_search_type(name);
122 #ifdef HAVE_MODULE_LOADING_SUPPORT
124 const char *modname = name;
125 if (!cfs_request_module("%s", modname)) {
126 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127 type = class_search_type(name);
129 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135 cfs_spin_lock(&type->obd_type_lock);
137 cfs_try_module_get(type->typ_dt_ops->o_owner);
138 cfs_spin_unlock(&type->obd_type_lock);
142 EXPORT_SYMBOL(class_get_type);
144 void class_put_type(struct obd_type *type)
147 cfs_spin_lock(&type->obd_type_lock);
149 cfs_module_put(type->typ_dt_ops->o_owner);
150 cfs_spin_unlock(&type->obd_type_lock);
152 EXPORT_SYMBOL(class_put_type);
154 #define CLASS_MAX_NAME 1024
156 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
157 struct lprocfs_vars *vars, const char *name,
158 struct lu_device_type *ldt)
160 struct obd_type *type;
165 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
167 if (class_search_type(name)) {
168 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
173 OBD_ALLOC(type, sizeof(*type));
177 OBD_ALLOC_PTR(type->typ_dt_ops);
178 OBD_ALLOC_PTR(type->typ_md_ops);
179 OBD_ALLOC(type->typ_name, strlen(name) + 1);
181 if (type->typ_dt_ops == NULL ||
182 type->typ_md_ops == NULL ||
183 type->typ_name == NULL)
186 *(type->typ_dt_ops) = *dt_ops;
187 /* md_ops is optional */
189 *(type->typ_md_ops) = *md_ops;
190 strcpy(type->typ_name, name);
191 cfs_spin_lock_init(&type->obd_type_lock);
194 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
196 if (IS_ERR(type->typ_procroot)) {
197 rc = PTR_ERR(type->typ_procroot);
198 type->typ_procroot = NULL;
204 rc = lu_device_type_init(ldt);
209 cfs_spin_lock(&obd_types_lock);
210 cfs_list_add(&type->typ_chain, &obd_types);
211 cfs_spin_unlock(&obd_types_lock);
216 if (type->typ_name != NULL)
217 OBD_FREE(type->typ_name, strlen(name) + 1);
218 if (type->typ_md_ops != NULL)
219 OBD_FREE_PTR(type->typ_md_ops);
220 if (type->typ_dt_ops != NULL)
221 OBD_FREE_PTR(type->typ_dt_ops);
222 OBD_FREE(type, sizeof(*type));
225 EXPORT_SYMBOL(class_register_type);
227 int class_unregister_type(const char *name)
229 struct obd_type *type = class_search_type(name);
233 CERROR("unknown obd type\n");
237 if (type->typ_refcnt) {
238 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
239 /* This is a bad situation, let's make the best of it */
240 /* Remove ops, but leave the name for debugging */
241 OBD_FREE_PTR(type->typ_dt_ops);
242 OBD_FREE_PTR(type->typ_md_ops);
246 if (type->typ_procroot) {
247 lprocfs_remove(&type->typ_procroot);
251 lu_device_type_fini(type->typ_lu);
253 cfs_spin_lock(&obd_types_lock);
254 cfs_list_del(&type->typ_chain);
255 cfs_spin_unlock(&obd_types_lock);
256 OBD_FREE(type->typ_name, strlen(name) + 1);
257 if (type->typ_dt_ops != NULL)
258 OBD_FREE_PTR(type->typ_dt_ops);
259 if (type->typ_md_ops != NULL)
260 OBD_FREE_PTR(type->typ_md_ops);
261 OBD_FREE(type, sizeof(*type));
263 } /* class_unregister_type */
264 EXPORT_SYMBOL(class_unregister_type);
267 * Create a new obd device.
269 * Find an empty slot in ::obd_devs[], create a new obd device in it.
271 * \param[in] type_name obd device type string.
272 * \param[in] name obd device name.
274 * \retval NULL if create fails, otherwise return the obd device
277 struct obd_device *class_newdev(const char *type_name, const char *name)
279 struct obd_device *result = NULL;
280 struct obd_device *newdev;
281 struct obd_type *type = NULL;
283 int new_obd_minor = 0;
285 if (strlen(name) >= MAX_OBD_NAME) {
286 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287 RETURN(ERR_PTR(-EINVAL));
290 type = class_get_type(type_name);
292 CERROR("OBD: unknown type: %s\n", type_name);
293 RETURN(ERR_PTR(-ENODEV));
296 newdev = obd_device_alloc();
297 if (newdev == NULL) {
298 class_put_type(type);
299 RETURN(ERR_PTR(-ENOMEM));
301 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
303 cfs_write_lock(&obd_dev_lock);
304 for (i = 0; i < class_devno_max(); i++) {
305 struct obd_device *obd = class_num2obd(i);
307 if (obd && obd->obd_name &&
308 (strcmp(name, obd->obd_name) == 0)) {
309 CERROR("Device %s already exists, won't add\n", name);
311 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
312 "%p obd_magic %08x != %08x\n", result,
313 result->obd_magic, OBD_DEVICE_MAGIC);
314 LASSERTF(result->obd_minor == new_obd_minor,
315 "%p obd_minor %d != %d\n", result,
316 result->obd_minor, new_obd_minor);
318 obd_devs[result->obd_minor] = NULL;
319 result->obd_name[0]='\0';
321 result = ERR_PTR(-EEXIST);
324 if (!result && !obd) {
326 result->obd_minor = i;
328 result->obd_type = type;
329 strncpy(result->obd_name, name,
330 sizeof(result->obd_name) - 1);
331 obd_devs[i] = result;
334 cfs_write_unlock(&obd_dev_lock);
336 if (result == NULL && i >= class_devno_max()) {
337 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339 result = ERR_PTR(-EOVERFLOW);
342 if (IS_ERR(result)) {
343 obd_device_free(newdev);
344 class_put_type(type);
346 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
347 result->obd_name, result);
352 void class_release_dev(struct obd_device *obd)
354 struct obd_type *obd_type = obd->obd_type;
356 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
357 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
358 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
359 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
360 LASSERT(obd_type != NULL);
362 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
363 obd->obd_name,obd->obd_type->typ_name);
365 cfs_write_lock(&obd_dev_lock);
366 obd_devs[obd->obd_minor] = NULL;
367 cfs_write_unlock(&obd_dev_lock);
368 obd_device_free(obd);
370 class_put_type(obd_type);
373 int class_name2dev(const char *name)
380 cfs_read_lock(&obd_dev_lock);
381 for (i = 0; i < class_devno_max(); i++) {
382 struct obd_device *obd = class_num2obd(i);
384 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
385 /* Make sure we finished attaching before we give
386 out any references */
387 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
388 if (obd->obd_attached) {
389 cfs_read_unlock(&obd_dev_lock);
395 cfs_read_unlock(&obd_dev_lock);
399 EXPORT_SYMBOL(class_name2dev);
401 struct obd_device *class_name2obd(const char *name)
403 int dev = class_name2dev(name);
405 if (dev < 0 || dev > class_devno_max())
407 return class_num2obd(dev);
409 EXPORT_SYMBOL(class_name2obd);
411 int class_uuid2dev(struct obd_uuid *uuid)
415 cfs_read_lock(&obd_dev_lock);
416 for (i = 0; i < class_devno_max(); i++) {
417 struct obd_device *obd = class_num2obd(i);
419 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
420 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
421 cfs_read_unlock(&obd_dev_lock);
425 cfs_read_unlock(&obd_dev_lock);
429 EXPORT_SYMBOL(class_uuid2dev);
431 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
433 int dev = class_uuid2dev(uuid);
436 return class_num2obd(dev);
438 EXPORT_SYMBOL(class_uuid2obd);
441 * Get obd device from ::obd_devs[]
443 * \param num [in] array index
445 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
446 * otherwise return the obd device there.
448 struct obd_device *class_num2obd(int num)
450 struct obd_device *obd = NULL;
452 if (num < class_devno_max()) {
457 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
458 "%p obd_magic %08x != %08x\n",
459 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
460 LASSERTF(obd->obd_minor == num,
461 "%p obd_minor %0d != %0d\n",
462 obd, obd->obd_minor, num);
467 EXPORT_SYMBOL(class_num2obd);
469 void class_obd_list(void)
474 cfs_read_lock(&obd_dev_lock);
475 for (i = 0; i < class_devno_max(); i++) {
476 struct obd_device *obd = class_num2obd(i);
480 if (obd->obd_stopping)
482 else if (obd->obd_set_up)
484 else if (obd->obd_attached)
488 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
489 i, status, obd->obd_type->typ_name,
490 obd->obd_name, obd->obd_uuid.uuid,
491 cfs_atomic_read(&obd->obd_refcount));
493 cfs_read_unlock(&obd_dev_lock);
497 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
498 specified, then only the client with that uuid is returned,
499 otherwise any client connected to the tgt is returned. */
500 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
501 const char * typ_name,
502 struct obd_uuid *grp_uuid)
506 cfs_read_lock(&obd_dev_lock);
507 for (i = 0; i < class_devno_max(); i++) {
508 struct obd_device *obd = class_num2obd(i);
512 if ((strncmp(obd->obd_type->typ_name, typ_name,
513 strlen(typ_name)) == 0)) {
514 if (obd_uuid_equals(tgt_uuid,
515 &obd->u.cli.cl_target_uuid) &&
516 ((grp_uuid)? obd_uuid_equals(grp_uuid,
517 &obd->obd_uuid) : 1)) {
518 cfs_read_unlock(&obd_dev_lock);
523 cfs_read_unlock(&obd_dev_lock);
527 EXPORT_SYMBOL(class_find_client_obd);
529 /* Iterate the obd_device list looking devices have grp_uuid. Start
530 searching at *next, and if a device is found, the next index to look
531 at is saved in *next. If next is NULL, then the first matching device
532 will always be returned. */
533 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
539 else if (*next >= 0 && *next < class_devno_max())
544 cfs_read_lock(&obd_dev_lock);
545 for (; i < class_devno_max(); i++) {
546 struct obd_device *obd = class_num2obd(i);
550 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
553 cfs_read_unlock(&obd_dev_lock);
557 cfs_read_unlock(&obd_dev_lock);
561 EXPORT_SYMBOL(class_devices_in_group);
564 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
565 * adjust sptlrpc settings accordingly.
567 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
569 struct obd_device *obd;
573 LASSERT(namelen > 0);
575 cfs_read_lock(&obd_dev_lock);
576 for (i = 0; i < class_devno_max(); i++) {
577 obd = class_num2obd(i);
579 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
582 /* only notify mdc, osc, mdt, ost */
583 type = obd->obd_type->typ_name;
584 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
585 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
586 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
587 strcmp(type, LUSTRE_OST_NAME) != 0)
590 if (strncmp(obd->obd_name, fsname, namelen))
593 class_incref(obd, __FUNCTION__, obd);
594 cfs_read_unlock(&obd_dev_lock);
595 rc2 = obd_set_info_async(obd->obd_self_export,
596 sizeof(KEY_SPTLRPC_CONF),
597 KEY_SPTLRPC_CONF, 0, NULL, NULL);
599 class_decref(obd, __FUNCTION__, obd);
600 cfs_read_lock(&obd_dev_lock);
602 cfs_read_unlock(&obd_dev_lock);
605 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
607 void obd_cleanup_caches(void)
612 if (obd_device_cachep) {
613 rc = cfs_mem_cache_destroy(obd_device_cachep);
614 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
615 obd_device_cachep = NULL;
618 rc = cfs_mem_cache_destroy(obdo_cachep);
619 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
623 rc = cfs_mem_cache_destroy(import_cachep);
624 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
625 import_cachep = NULL;
628 rc = cfs_mem_cache_destroy(capa_cachep);
629 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
635 int obd_init_caches(void)
639 LASSERT(obd_device_cachep == NULL);
640 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
641 sizeof(struct obd_device),
643 if (!obd_device_cachep)
646 LASSERT(obdo_cachep == NULL);
647 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
652 LASSERT(import_cachep == NULL);
653 import_cachep = cfs_mem_cache_create("ll_import_cache",
654 sizeof(struct obd_import),
659 LASSERT(capa_cachep == NULL);
660 capa_cachep = cfs_mem_cache_create("capa_cache",
661 sizeof(struct obd_capa), 0, 0);
667 obd_cleanup_caches();
672 /* map connection to client */
673 struct obd_export *class_conn2export(struct lustre_handle *conn)
675 struct obd_export *export;
679 CDEBUG(D_CACHE, "looking for null handle\n");
683 if (conn->cookie == -1) { /* this means assign a new connection */
684 CDEBUG(D_CACHE, "want a new connection\n");
688 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
689 export = class_handle2object(conn->cookie);
692 EXPORT_SYMBOL(class_conn2export);
694 struct obd_device *class_exp2obd(struct obd_export *exp)
700 EXPORT_SYMBOL(class_exp2obd);
702 struct obd_device *class_conn2obd(struct lustre_handle *conn)
704 struct obd_export *export;
705 export = class_conn2export(conn);
707 struct obd_device *obd = export->exp_obd;
708 class_export_put(export);
713 EXPORT_SYMBOL(class_conn2obd);
715 struct obd_import *class_exp2cliimp(struct obd_export *exp)
717 struct obd_device *obd = exp->exp_obd;
720 return obd->u.cli.cl_import;
722 EXPORT_SYMBOL(class_exp2cliimp);
724 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
726 struct obd_device *obd = class_conn2obd(conn);
729 return obd->u.cli.cl_import;
731 EXPORT_SYMBOL(class_conn2cliimp);
733 /* Export management functions */
734 static void class_export_destroy(struct obd_export *exp)
736 struct obd_device *obd = exp->exp_obd;
739 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
741 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
742 exp->exp_client_uuid.uuid, obd->obd_name);
744 LASSERT(obd != NULL);
746 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
747 if (exp->exp_connection)
748 ptlrpc_put_connection_superhack(exp->exp_connection);
750 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
751 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
752 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
753 LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
754 obd_destroy_export(exp);
755 class_decref(obd, "export", exp);
757 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
761 static void export_handle_addref(void *export)
763 class_export_get(export);
766 struct obd_export *class_export_get(struct obd_export *exp)
768 cfs_atomic_inc(&exp->exp_refcount);
769 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
770 cfs_atomic_read(&exp->exp_refcount));
773 EXPORT_SYMBOL(class_export_get);
775 void class_export_put(struct obd_export *exp)
777 LASSERT(exp != NULL);
778 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, 0x5a5a5a);
779 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
780 cfs_atomic_read(&exp->exp_refcount) - 1);
782 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
783 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
784 CDEBUG(D_IOCTL, "final put %p/%s\n",
785 exp, exp->exp_client_uuid.uuid);
787 /* release nid stat refererence */
788 lprocfs_exp_cleanup(exp);
790 obd_zombie_export_add(exp);
793 EXPORT_SYMBOL(class_export_put);
795 /* Creates a new export, adds it to the hash table, and returns a
796 * pointer to it. The refcount is 2: one for the hash reference, and
797 * one for the pointer returned by this function. */
798 struct obd_export *class_new_export(struct obd_device *obd,
799 struct obd_uuid *cluuid)
801 struct obd_export *export;
802 cfs_hash_t *hash = NULL;
806 OBD_ALLOC_PTR(export);
808 return ERR_PTR(-ENOMEM);
810 export->exp_conn_cnt = 0;
811 export->exp_lock_hash = NULL;
812 cfs_atomic_set(&export->exp_refcount, 2);
813 cfs_atomic_set(&export->exp_rpc_count, 0);
814 cfs_atomic_set(&export->exp_cb_count, 0);
815 cfs_atomic_set(&export->exp_locks_count, 0);
816 #if LUSTRE_TRACKS_LOCK_EXP_REFS
817 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
818 cfs_spin_lock_init(&export->exp_locks_list_guard);
820 cfs_atomic_set(&export->exp_replay_count, 0);
821 export->exp_obd = obd;
822 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
823 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
824 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
825 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
826 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
827 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
828 class_handle_hash(&export->exp_handle, export_handle_addref);
829 export->exp_last_request_time = cfs_time_current_sec();
830 cfs_spin_lock_init(&export->exp_lock);
831 cfs_spin_lock_init(&export->exp_rpc_lock);
832 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
833 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
835 export->exp_sp_peer = LUSTRE_SP_ANY;
836 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
837 export->exp_client_uuid = *cluuid;
838 obd_init_export(export);
840 cfs_spin_lock(&obd->obd_dev_lock);
841 /* shouldn't happen, but might race */
842 if (obd->obd_stopping)
843 GOTO(exit_unlock, rc = -ENODEV);
845 hash = cfs_hash_getref(obd->obd_uuid_hash);
847 GOTO(exit_unlock, rc = -ENODEV);
848 cfs_spin_unlock(&obd->obd_dev_lock);
850 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
851 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
853 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
854 obd->obd_name, cluuid->uuid, rc);
855 GOTO(exit_err, rc = -EALREADY);
859 cfs_spin_lock(&obd->obd_dev_lock);
860 if (obd->obd_stopping) {
861 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
862 GOTO(exit_unlock, rc = -ENODEV);
865 class_incref(obd, "export", export);
866 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
867 cfs_list_add_tail(&export->exp_obd_chain_timed,
868 &export->exp_obd->obd_exports_timed);
869 export->exp_obd->obd_num_exports++;
870 cfs_spin_unlock(&obd->obd_dev_lock);
871 cfs_hash_putref(hash);
875 cfs_spin_unlock(&obd->obd_dev_lock);
878 cfs_hash_putref(hash);
879 class_handle_unhash(&export->exp_handle);
880 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
881 obd_destroy_export(export);
882 OBD_FREE_PTR(export);
885 EXPORT_SYMBOL(class_new_export);
887 void class_unlink_export(struct obd_export *exp)
889 class_handle_unhash(&exp->exp_handle);
891 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
892 /* delete an uuid-export hashitem from hashtables */
893 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
894 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
895 &exp->exp_client_uuid,
896 &exp->exp_uuid_hash);
898 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
899 cfs_list_del_init(&exp->exp_obd_chain_timed);
900 exp->exp_obd->obd_num_exports--;
901 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
902 class_export_put(exp);
904 EXPORT_SYMBOL(class_unlink_export);
906 /* Import management functions */
907 void class_import_destroy(struct obd_import *imp)
911 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
912 imp->imp_obd->obd_name);
914 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
916 ptlrpc_put_connection_superhack(imp->imp_connection);
918 while (!cfs_list_empty(&imp->imp_conn_list)) {
919 struct obd_import_conn *imp_conn;
921 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
922 struct obd_import_conn, oic_item);
923 cfs_list_del_init(&imp_conn->oic_item);
924 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
925 OBD_FREE(imp_conn, sizeof(*imp_conn));
928 LASSERT(imp->imp_sec == NULL);
929 class_decref(imp->imp_obd, "import", imp);
930 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
934 static void import_handle_addref(void *import)
936 class_import_get(import);
939 struct obd_import *class_import_get(struct obd_import *import)
941 cfs_atomic_inc(&import->imp_refcount);
942 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
943 cfs_atomic_read(&import->imp_refcount),
944 import->imp_obd->obd_name);
947 EXPORT_SYMBOL(class_import_get);
949 void class_import_put(struct obd_import *imp)
953 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
954 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, 0x5a5a5a);
956 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
957 cfs_atomic_read(&imp->imp_refcount) - 1,
958 imp->imp_obd->obd_name);
960 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
961 CDEBUG(D_INFO, "final put import %p\n", imp);
962 obd_zombie_import_add(imp);
967 EXPORT_SYMBOL(class_import_put);
969 static void init_imp_at(struct imp_at *at) {
971 at_init(&at->iat_net_latency, 0, 0);
972 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
973 /* max service estimates are tracked on the server side, so
974 don't use the AT history here, just use the last reported
975 val. (But keep hist for proc histogram, worst_ever) */
976 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
981 struct obd_import *class_new_import(struct obd_device *obd)
983 struct obd_import *imp;
985 OBD_ALLOC(imp, sizeof(*imp));
989 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
990 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
991 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
992 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
993 cfs_spin_lock_init(&imp->imp_lock);
994 imp->imp_last_success_conn = 0;
995 imp->imp_state = LUSTRE_IMP_NEW;
996 imp->imp_obd = class_incref(obd, "import", imp);
997 cfs_sema_init(&imp->imp_sec_mutex, 1);
998 cfs_waitq_init(&imp->imp_recovery_waitq);
1000 cfs_atomic_set(&imp->imp_refcount, 2);
1001 cfs_atomic_set(&imp->imp_unregistering, 0);
1002 cfs_atomic_set(&imp->imp_inflight, 0);
1003 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1004 cfs_atomic_set(&imp->imp_inval_count, 0);
1005 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1006 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1007 class_handle_hash(&imp->imp_handle, import_handle_addref);
1008 init_imp_at(&imp->imp_at);
1010 /* the default magic is V2, will be used in connect RPC, and
1011 * then adjusted according to the flags in request/reply. */
1012 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1016 EXPORT_SYMBOL(class_new_import);
1018 void class_destroy_import(struct obd_import *import)
1020 LASSERT(import != NULL);
1021 LASSERT(import != LP_POISON);
1023 class_handle_unhash(&import->imp_handle);
1025 cfs_spin_lock(&import->imp_lock);
1026 import->imp_generation++;
1027 cfs_spin_unlock(&import->imp_lock);
1028 class_import_put(import);
1030 EXPORT_SYMBOL(class_destroy_import);
1032 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1034 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1036 cfs_spin_lock(&exp->exp_locks_list_guard);
1038 LASSERT(lock->l_exp_refs_nr >= 0);
1040 if (lock->l_exp_refs_target != NULL &&
1041 lock->l_exp_refs_target != exp) {
1042 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1043 exp, lock, lock->l_exp_refs_target);
1045 if ((lock->l_exp_refs_nr ++) == 0) {
1046 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1047 lock->l_exp_refs_target = exp;
1049 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1050 lock, exp, lock->l_exp_refs_nr);
1051 cfs_spin_unlock(&exp->exp_locks_list_guard);
1053 EXPORT_SYMBOL(__class_export_add_lock_ref);
1055 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1057 cfs_spin_lock(&exp->exp_locks_list_guard);
1058 LASSERT(lock->l_exp_refs_nr > 0);
1059 if (lock->l_exp_refs_target != exp) {
1060 LCONSOLE_WARN("lock %p, "
1061 "mismatching export pointers: %p, %p\n",
1062 lock, lock->l_exp_refs_target, exp);
1064 if (-- lock->l_exp_refs_nr == 0) {
1065 cfs_list_del_init(&lock->l_exp_refs_link);
1066 lock->l_exp_refs_target = NULL;
1068 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1069 lock, exp, lock->l_exp_refs_nr);
1070 cfs_spin_unlock(&exp->exp_locks_list_guard);
1072 EXPORT_SYMBOL(__class_export_del_lock_ref);
1075 /* A connection defines an export context in which preallocation can
1076 be managed. This releases the export pointer reference, and returns
1077 the export handle, so the export refcount is 1 when this function
1079 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1080 struct obd_uuid *cluuid)
1082 struct obd_export *export;
1083 LASSERT(conn != NULL);
1084 LASSERT(obd != NULL);
1085 LASSERT(cluuid != NULL);
1088 export = class_new_export(obd, cluuid);
1090 RETURN(PTR_ERR(export));
1092 conn->cookie = export->exp_handle.h_cookie;
1093 class_export_put(export);
1095 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1096 cluuid->uuid, conn->cookie);
1099 EXPORT_SYMBOL(class_connect);
1101 /* if export is involved in recovery then clean up related things */
1102 void class_export_recovery_cleanup(struct obd_export *exp)
1104 struct obd_device *obd = exp->exp_obd;
1106 cfs_spin_lock(&obd->obd_recovery_task_lock);
1107 if (exp->exp_delayed)
1108 obd->obd_delayed_clients--;
1109 if (obd->obd_recovering && exp->exp_in_recovery) {
1110 cfs_spin_lock(&exp->exp_lock);
1111 exp->exp_in_recovery = 0;
1112 cfs_spin_unlock(&exp->exp_lock);
1113 LASSERT(obd->obd_connected_clients);
1114 obd->obd_connected_clients--;
1116 cfs_spin_unlock(&obd->obd_recovery_task_lock);
1117 /** Cleanup req replay fields */
1118 if (exp->exp_req_replay_needed) {
1119 cfs_spin_lock(&exp->exp_lock);
1120 exp->exp_req_replay_needed = 0;
1121 cfs_spin_unlock(&exp->exp_lock);
1122 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1123 cfs_atomic_dec(&obd->obd_req_replay_clients);
1125 /** Cleanup lock replay data */
1126 if (exp->exp_lock_replay_needed) {
1127 cfs_spin_lock(&exp->exp_lock);
1128 exp->exp_lock_replay_needed = 0;
1129 cfs_spin_unlock(&exp->exp_lock);
1130 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1131 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1135 /* This function removes 1-3 references from the export:
1136 * 1 - for export pointer passed
1137 * and if disconnect really need
1138 * 2 - removing from hash
1139 * 3 - in client_unlink_export
1140 * The export pointer passed to this function can destroyed */
1141 int class_disconnect(struct obd_export *export)
1143 int already_disconnected;
1146 if (export == NULL) {
1148 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1152 cfs_spin_lock(&export->exp_lock);
1153 already_disconnected = export->exp_disconnected;
1154 export->exp_disconnected = 1;
1155 cfs_spin_unlock(&export->exp_lock);
1157 /* class_cleanup(), abort_recovery(), and class_fail_export()
1158 * all end up in here, and if any of them race we shouldn't
1159 * call extra class_export_puts(). */
1160 if (already_disconnected) {
1161 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1162 GOTO(no_disconn, already_disconnected);
1165 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1166 export->exp_handle.h_cookie);
1168 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1169 cfs_hash_del(export->exp_obd->obd_nid_hash,
1170 &export->exp_connection->c_peer.nid,
1171 &export->exp_nid_hash);
1173 class_export_recovery_cleanup(export);
1174 class_unlink_export(export);
1176 class_export_put(export);
1179 EXPORT_SYMBOL(class_disconnect);
1181 /* Return non-zero for a fully connected export */
1182 int class_connected_export(struct obd_export *exp)
1186 cfs_spin_lock(&exp->exp_lock);
1187 connected = (exp->exp_conn_cnt > 0);
1188 cfs_spin_unlock(&exp->exp_lock);
1193 EXPORT_SYMBOL(class_connected_export);
1195 static void class_disconnect_export_list(cfs_list_t *list,
1196 enum obd_option flags)
1199 struct obd_export *exp;
1202 /* It's possible that an export may disconnect itself, but
1203 * nothing else will be added to this list. */
1204 while (!cfs_list_empty(list)) {
1205 exp = cfs_list_entry(list->next, struct obd_export,
1207 /* need for safe call CDEBUG after obd_disconnect */
1208 class_export_get(exp);
1210 cfs_spin_lock(&exp->exp_lock);
1211 exp->exp_flags = flags;
1212 cfs_spin_unlock(&exp->exp_lock);
1214 if (obd_uuid_equals(&exp->exp_client_uuid,
1215 &exp->exp_obd->obd_uuid)) {
1217 "exp %p export uuid == obd uuid, don't discon\n",
1219 /* Need to delete this now so we don't end up pointing
1220 * to work_list later when this export is cleaned up. */
1221 cfs_list_del_init(&exp->exp_obd_chain);
1222 class_export_put(exp);
1226 class_export_get(exp);
1227 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1228 "last request at "CFS_TIME_T"\n",
1229 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1230 exp, exp->exp_last_request_time);
1231 /* release one export reference anyway */
1232 rc = obd_disconnect(exp);
1234 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1235 obd_export_nid2str(exp), exp, rc);
1236 class_export_put(exp);
1241 void class_disconnect_exports(struct obd_device *obd)
1243 cfs_list_t work_list;
1246 /* Move all of the exports from obd_exports to a work list, en masse. */
1247 CFS_INIT_LIST_HEAD(&work_list);
1248 cfs_spin_lock(&obd->obd_dev_lock);
1249 cfs_list_splice_init(&obd->obd_exports, &work_list);
1250 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1251 cfs_spin_unlock(&obd->obd_dev_lock);
1253 if (!cfs_list_empty(&work_list)) {
1254 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1255 "disconnecting them\n", obd->obd_minor, obd);
1256 class_disconnect_export_list(&work_list,
1257 exp_flags_from_obd(obd));
1259 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1260 obd->obd_minor, obd);
1263 EXPORT_SYMBOL(class_disconnect_exports);
1265 /* Remove exports that have not completed recovery.
1267 void class_disconnect_stale_exports(struct obd_device *obd,
1268 int (*test_export)(struct obd_export *))
1270 cfs_list_t work_list;
1271 cfs_list_t *pos, *n;
1272 struct obd_export *exp;
1276 CFS_INIT_LIST_HEAD(&work_list);
1277 cfs_spin_lock(&obd->obd_dev_lock);
1278 cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1279 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1280 if (test_export(exp))
1283 /* don't count self-export as client */
1284 if (obd_uuid_equals(&exp->exp_client_uuid,
1285 &exp->exp_obd->obd_uuid))
1288 cfs_list_move(&exp->exp_obd_chain, &work_list);
1290 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1291 obd->obd_name, exp->exp_client_uuid.uuid,
1292 exp->exp_connection == NULL ? "<unknown>" :
1293 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1294 print_export_data(exp, "EVICTING", 0);
1296 cfs_spin_unlock(&obd->obd_dev_lock);
1299 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1300 obd->obd_name, evicted);
1301 obd->obd_stale_clients += evicted;
1303 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1304 OBD_OPT_ABORT_RECOV);
1307 EXPORT_SYMBOL(class_disconnect_stale_exports);
1309 void class_fail_export(struct obd_export *exp)
1311 int rc, already_failed;
1313 cfs_spin_lock(&exp->exp_lock);
1314 already_failed = exp->exp_failed;
1315 exp->exp_failed = 1;
1316 cfs_spin_unlock(&exp->exp_lock);
1318 if (already_failed) {
1319 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1320 exp, exp->exp_client_uuid.uuid);
1324 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1325 exp, exp->exp_client_uuid.uuid);
1327 if (obd_dump_on_timeout)
1328 libcfs_debug_dumplog();
1330 /* Most callers into obd_disconnect are removing their own reference
1331 * (request, for example) in addition to the one from the hash table.
1332 * We don't have such a reference here, so make one. */
1333 class_export_get(exp);
1334 rc = obd_disconnect(exp);
1336 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1338 CDEBUG(D_HA, "disconnected export %p/%s\n",
1339 exp, exp->exp_client_uuid.uuid);
1341 EXPORT_SYMBOL(class_fail_export);
1343 char *obd_export_nid2str(struct obd_export *exp)
1345 if (exp->exp_connection != NULL)
1346 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1350 EXPORT_SYMBOL(obd_export_nid2str);
1352 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1354 struct obd_export *doomed_exp = NULL;
1355 int exports_evicted = 0;
1357 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1360 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1361 if (doomed_exp == NULL)
1364 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1365 "nid %s found, wanted nid %s, requested nid %s\n",
1366 obd_export_nid2str(doomed_exp),
1367 libcfs_nid2str(nid_key), nid);
1368 LASSERTF(doomed_exp != obd->obd_self_export,
1369 "self-export is hashed by NID?\n");
1371 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1372 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1374 class_fail_export(doomed_exp);
1375 class_export_put(doomed_exp);
1378 if (!exports_evicted)
1379 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1380 obd->obd_name, nid);
1381 return exports_evicted;
1383 EXPORT_SYMBOL(obd_export_evict_by_nid);
1385 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1387 struct obd_export *doomed_exp = NULL;
1388 struct obd_uuid doomed_uuid;
1389 int exports_evicted = 0;
1391 obd_str2uuid(&doomed_uuid, uuid);
1392 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1393 CERROR("%s: can't evict myself\n", obd->obd_name);
1394 return exports_evicted;
1397 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1399 if (doomed_exp == NULL) {
1400 CERROR("%s: can't disconnect %s: no exports found\n",
1401 obd->obd_name, uuid);
1403 CWARN("%s: evicting %s at adminstrative request\n",
1404 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1405 class_fail_export(doomed_exp);
1406 class_export_put(doomed_exp);
1410 return exports_evicted;
1412 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1414 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1415 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1416 EXPORT_SYMBOL(class_export_dump_hook);
1419 static void print_export_data(struct obd_export *exp, const char *status,
1422 struct ptlrpc_reply_state *rs;
1423 struct ptlrpc_reply_state *first_reply = NULL;
1426 cfs_spin_lock(&exp->exp_lock);
1427 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1433 cfs_spin_unlock(&exp->exp_lock);
1435 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1436 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1437 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1438 cfs_atomic_read(&exp->exp_rpc_count),
1439 cfs_atomic_read(&exp->exp_cb_count),
1440 cfs_atomic_read(&exp->exp_locks_count),
1441 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1442 nreplies, first_reply, nreplies > 3 ? "..." : "",
1443 exp->exp_last_committed);
1444 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1445 if (locks && class_export_dump_hook != NULL)
1446 class_export_dump_hook(exp);
1450 void dump_exports(struct obd_device *obd, int locks)
1452 struct obd_export *exp;
1454 cfs_spin_lock(&obd->obd_dev_lock);
1455 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1456 print_export_data(exp, "ACTIVE", locks);
1457 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1458 print_export_data(exp, "UNLINKED", locks);
1459 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1460 print_export_data(exp, "DELAYED", locks);
1461 cfs_spin_unlock(&obd->obd_dev_lock);
1462 cfs_spin_lock(&obd_zombie_impexp_lock);
1463 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1464 print_export_data(exp, "ZOMBIE", locks);
1465 cfs_spin_unlock(&obd_zombie_impexp_lock);
1467 EXPORT_SYMBOL(dump_exports);
1469 void obd_exports_barrier(struct obd_device *obd)
1472 LASSERT(cfs_list_empty(&obd->obd_exports));
1473 cfs_spin_lock(&obd->obd_dev_lock);
1474 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1475 cfs_spin_unlock(&obd->obd_dev_lock);
1476 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1477 cfs_time_seconds(waited));
1478 if (waited > 5 && IS_PO2(waited)) {
1479 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1480 "more than %d seconds. "
1481 "The obd refcount = %d. Is it stuck?\n",
1482 obd->obd_name, waited,
1483 cfs_atomic_read(&obd->obd_refcount));
1484 dump_exports(obd, 1);
1487 cfs_spin_lock(&obd->obd_dev_lock);
1489 cfs_spin_unlock(&obd->obd_dev_lock);
1491 EXPORT_SYMBOL(obd_exports_barrier);
1493 /* Total amount of zombies to be destroyed */
1494 static int zombies_count = 0;
1497 * kill zombie imports and exports
1499 void obd_zombie_impexp_cull(void)
1501 struct obd_import *import;
1502 struct obd_export *export;
1506 cfs_spin_lock(&obd_zombie_impexp_lock);
1509 if (!cfs_list_empty(&obd_zombie_imports)) {
1510 import = cfs_list_entry(obd_zombie_imports.next,
1513 cfs_list_del_init(&import->imp_zombie_chain);
1517 if (!cfs_list_empty(&obd_zombie_exports)) {
1518 export = cfs_list_entry(obd_zombie_exports.next,
1521 cfs_list_del_init(&export->exp_obd_chain);
1524 cfs_spin_unlock(&obd_zombie_impexp_lock);
1526 if (import != NULL) {
1527 class_import_destroy(import);
1528 cfs_spin_lock(&obd_zombie_impexp_lock);
1530 cfs_spin_unlock(&obd_zombie_impexp_lock);
1533 if (export != NULL) {
1534 class_export_destroy(export);
1535 cfs_spin_lock(&obd_zombie_impexp_lock);
1537 cfs_spin_unlock(&obd_zombie_impexp_lock);
1541 } while (import != NULL || export != NULL);
1545 static cfs_completion_t obd_zombie_start;
1546 static cfs_completion_t obd_zombie_stop;
1547 static unsigned long obd_zombie_flags;
1548 static cfs_waitq_t obd_zombie_waitq;
1549 static pid_t obd_zombie_pid;
1552 OBD_ZOMBIE_STOP = 1 << 1
1556 * check for work for kill zombie import/export thread.
1558 static int obd_zombie_impexp_check(void *arg)
1562 cfs_spin_lock(&obd_zombie_impexp_lock);
1563 rc = (zombies_count == 0) &&
1564 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1565 cfs_spin_unlock(&obd_zombie_impexp_lock);
1571 * Add export to the obd_zombe thread and notify it.
1573 static void obd_zombie_export_add(struct obd_export *exp) {
1574 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1575 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1576 cfs_list_del_init(&exp->exp_obd_chain);
1577 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1578 cfs_spin_lock(&obd_zombie_impexp_lock);
1580 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1581 cfs_spin_unlock(&obd_zombie_impexp_lock);
1583 if (obd_zombie_impexp_notify != NULL)
1584 obd_zombie_impexp_notify();
1588 * Add import to the obd_zombe thread and notify it.
1590 static void obd_zombie_import_add(struct obd_import *imp) {
1591 LASSERT(imp->imp_sec == NULL);
1592 cfs_spin_lock(&obd_zombie_impexp_lock);
1593 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1595 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1596 cfs_spin_unlock(&obd_zombie_impexp_lock);
1598 if (obd_zombie_impexp_notify != NULL)
1599 obd_zombie_impexp_notify();
1603 * notify import/export destroy thread about new zombie.
1605 static void obd_zombie_impexp_notify(void)
1608 * Make sure obd_zomebie_impexp_thread get this notification.
1609 * It is possible this signal only get by obd_zombie_barrier, and
1610 * barrier gulps this notification and sleeps away and hangs ensues
1612 cfs_waitq_broadcast(&obd_zombie_waitq);
1616 * check whether obd_zombie is idle
1618 static int obd_zombie_is_idle(void)
1622 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1623 cfs_spin_lock(&obd_zombie_impexp_lock);
1624 rc = (zombies_count == 0);
1625 cfs_spin_unlock(&obd_zombie_impexp_lock);
1630 * wait when obd_zombie import/export queues become empty
1632 void obd_zombie_barrier(void)
1634 struct l_wait_info lwi = { 0 };
1636 if (obd_zombie_pid == cfs_curproc_pid())
1637 /* don't wait for myself */
1639 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1641 EXPORT_SYMBOL(obd_zombie_barrier);
1646 * destroy zombie export/import thread.
1648 static int obd_zombie_impexp_thread(void *unused)
1652 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1653 cfs_complete(&obd_zombie_start);
1657 cfs_complete(&obd_zombie_start);
1659 obd_zombie_pid = cfs_curproc_pid();
1661 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1662 struct l_wait_info lwi = { 0 };
1664 l_wait_event(obd_zombie_waitq,
1665 !obd_zombie_impexp_check(NULL), &lwi);
1666 obd_zombie_impexp_cull();
1669 * Notify obd_zombie_barrier callers that queues
1672 cfs_waitq_signal(&obd_zombie_waitq);
1675 cfs_complete(&obd_zombie_stop);
1680 #else /* ! KERNEL */
1682 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1683 static void *obd_zombie_impexp_work_cb;
1684 static void *obd_zombie_impexp_idle_cb;
1686 int obd_zombie_impexp_kill(void *arg)
1690 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1691 obd_zombie_impexp_cull();
1694 cfs_atomic_dec(&zombie_recur);
1701 * start destroy zombie import/export thread
1703 int obd_zombie_impexp_init(void)
1707 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1708 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1709 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1710 cfs_init_completion(&obd_zombie_start);
1711 cfs_init_completion(&obd_zombie_stop);
1712 cfs_waitq_init(&obd_zombie_waitq);
1716 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1720 cfs_wait_for_completion(&obd_zombie_start);
1723 obd_zombie_impexp_work_cb =
1724 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1725 &obd_zombie_impexp_kill, NULL);
1727 obd_zombie_impexp_idle_cb =
1728 liblustre_register_idle_callback("obd_zombi_impexp_check",
1729 &obd_zombie_impexp_check, NULL);
1735 * stop destroy zombie import/export thread
1737 void obd_zombie_impexp_stop(void)
1739 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1740 obd_zombie_impexp_notify();
1742 cfs_wait_for_completion(&obd_zombie_stop);
1744 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1745 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1749 /***** Kernel-userspace comm helpers *******/
1751 /* Get length of entire message, including header */
1752 int kuc_len(int payload_len)
1754 return sizeof(struct kuc_hdr) + payload_len;
1756 EXPORT_SYMBOL(kuc_len);
1758 /* Get a pointer to kuc header, given a ptr to the payload
1759 * @param p Pointer to payload area
1760 * @returns Pointer to kuc header
1762 struct kuc_hdr * kuc_ptr(void *p)
1764 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1765 LASSERT(lh->kuc_magic == KUC_MAGIC);
1768 EXPORT_SYMBOL(kuc_ptr);
1770 /* Test if payload is part of kuc message
1771 * @param p Pointer to payload area
1774 int kuc_ispayload(void *p)
1776 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1778 if (kh->kuc_magic == KUC_MAGIC)
1783 EXPORT_SYMBOL(kuc_ispayload);
1785 /* Alloc space for a message, and fill in header
1786 * @return Pointer to payload area
1788 void *kuc_alloc(int payload_len, int transport, int type)
1791 int len = kuc_len(payload_len);
1795 return ERR_PTR(-ENOMEM);
1797 lh->kuc_magic = KUC_MAGIC;
1798 lh->kuc_transport = transport;
1799 lh->kuc_msgtype = type;
1800 lh->kuc_msglen = len;
1802 return (void *)(lh + 1);
1804 EXPORT_SYMBOL(kuc_alloc);
1806 /* Takes pointer to payload area */
1807 inline void kuc_free(void *p, int payload_len)
1809 struct kuc_hdr *lh = kuc_ptr(p);
1810 OBD_FREE(lh, kuc_len(payload_len));
1812 EXPORT_SYMBOL(kuc_free);