1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lustre/obdclass/genops.c
41 * These are the only exported functions, they provide some generic
42 * infrastructure for managing object devices
45 #define DEBUG_SUBSYSTEM S_CLASS
47 #include <liblustre.h>
50 #include <obd_class.h>
51 #include <lprocfs_status.h>
53 extern cfs_list_t obd_types;
54 cfs_spinlock_t obd_types_lock;
56 cfs_mem_cache_t *obd_device_cachep;
57 cfs_mem_cache_t *obdo_cachep;
58 EXPORT_SYMBOL(obdo_cachep);
59 cfs_mem_cache_t *import_cachep;
61 cfs_list_t obd_zombie_imports;
62 cfs_list_t obd_zombie_exports;
63 cfs_spinlock_t obd_zombie_impexp_lock;
64 static void obd_zombie_impexp_notify(void);
65 static void obd_zombie_export_add(struct obd_export *exp);
66 static void obd_zombie_import_add(struct obd_import *imp);
67 static void print_export_data(struct obd_export *exp,
68 const char *status, int locks);
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
73 * support functions: we could use inter-module communication, but this
74 * is more portable to other OS's
76 static struct obd_device *obd_device_alloc(void)
78 struct obd_device *obd;
80 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
82 obd->obd_magic = OBD_DEVICE_MAGIC;
87 static void obd_device_free(struct obd_device *obd)
90 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
91 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
92 if (obd->obd_namespace != NULL) {
93 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
94 obd, obd->obd_namespace, obd->obd_force);
97 lu_ref_fini(&obd->obd_reference);
98 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
101 struct obd_type *class_search_type(const char *name)
104 struct obd_type *type;
106 cfs_spin_lock(&obd_types_lock);
107 cfs_list_for_each(tmp, &obd_types) {
108 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
109 if (strcmp(type->typ_name, name) == 0) {
110 cfs_spin_unlock(&obd_types_lock);
114 cfs_spin_unlock(&obd_types_lock);
118 struct obd_type *class_get_type(const char *name)
120 struct obd_type *type = class_search_type(name);
122 #ifdef HAVE_MODULE_LOADING_SUPPORT
124 const char *modname = name;
125 if (!cfs_request_module("%s", modname)) {
126 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127 type = class_search_type(name);
129 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135 cfs_spin_lock(&type->obd_type_lock);
137 cfs_try_module_get(type->typ_dt_ops->o_owner);
138 cfs_spin_unlock(&type->obd_type_lock);
142 EXPORT_SYMBOL(class_get_type);
144 void class_put_type(struct obd_type *type)
147 cfs_spin_lock(&type->obd_type_lock);
149 cfs_module_put(type->typ_dt_ops->o_owner);
150 cfs_spin_unlock(&type->obd_type_lock);
152 EXPORT_SYMBOL(class_put_type);
154 #define CLASS_MAX_NAME 1024
156 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
157 struct lprocfs_vars *vars, const char *name,
158 struct lu_device_type *ldt)
160 struct obd_type *type;
165 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
167 if (class_search_type(name)) {
168 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
173 OBD_ALLOC(type, sizeof(*type));
177 OBD_ALLOC_PTR(type->typ_dt_ops);
178 OBD_ALLOC_PTR(type->typ_md_ops);
179 OBD_ALLOC(type->typ_name, strlen(name) + 1);
181 if (type->typ_dt_ops == NULL ||
182 type->typ_md_ops == NULL ||
183 type->typ_name == NULL)
186 *(type->typ_dt_ops) = *dt_ops;
187 /* md_ops is optional */
189 *(type->typ_md_ops) = *md_ops;
190 strcpy(type->typ_name, name);
191 cfs_spin_lock_init(&type->obd_type_lock);
194 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
196 if (IS_ERR(type->typ_procroot)) {
197 rc = PTR_ERR(type->typ_procroot);
198 type->typ_procroot = NULL;
204 rc = lu_device_type_init(ldt);
209 cfs_spin_lock(&obd_types_lock);
210 cfs_list_add(&type->typ_chain, &obd_types);
211 cfs_spin_unlock(&obd_types_lock);
216 if (type->typ_name != NULL)
217 OBD_FREE(type->typ_name, strlen(name) + 1);
218 if (type->typ_md_ops != NULL)
219 OBD_FREE_PTR(type->typ_md_ops);
220 if (type->typ_dt_ops != NULL)
221 OBD_FREE_PTR(type->typ_dt_ops);
222 OBD_FREE(type, sizeof(*type));
225 EXPORT_SYMBOL(class_register_type);
227 int class_unregister_type(const char *name)
229 struct obd_type *type = class_search_type(name);
233 CERROR("unknown obd type\n");
237 if (type->typ_refcnt) {
238 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
239 /* This is a bad situation, let's make the best of it */
240 /* Remove ops, but leave the name for debugging */
241 OBD_FREE_PTR(type->typ_dt_ops);
242 OBD_FREE_PTR(type->typ_md_ops);
246 if (type->typ_procroot) {
247 lprocfs_remove(&type->typ_procroot);
251 lu_device_type_fini(type->typ_lu);
253 cfs_spin_lock(&obd_types_lock);
254 cfs_list_del(&type->typ_chain);
255 cfs_spin_unlock(&obd_types_lock);
256 OBD_FREE(type->typ_name, strlen(name) + 1);
257 if (type->typ_dt_ops != NULL)
258 OBD_FREE_PTR(type->typ_dt_ops);
259 if (type->typ_md_ops != NULL)
260 OBD_FREE_PTR(type->typ_md_ops);
261 OBD_FREE(type, sizeof(*type));
263 } /* class_unregister_type */
264 EXPORT_SYMBOL(class_unregister_type);
267 * Create a new obd device.
269 * Find an empty slot in ::obd_devs[], create a new obd device in it.
271 * \param[in] type_name obd device type string.
272 * \param[in] name obd device name.
274 * \retval NULL if create fails, otherwise return the obd device
277 struct obd_device *class_newdev(const char *type_name, const char *name)
279 struct obd_device *result = NULL;
280 struct obd_device *newdev;
281 struct obd_type *type = NULL;
283 int new_obd_minor = 0;
285 if (strlen(name) >= MAX_OBD_NAME) {
286 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287 RETURN(ERR_PTR(-EINVAL));
290 type = class_get_type(type_name);
292 CERROR("OBD: unknown type: %s\n", type_name);
293 RETURN(ERR_PTR(-ENODEV));
296 newdev = obd_device_alloc();
297 if (newdev == NULL) {
298 class_put_type(type);
299 RETURN(ERR_PTR(-ENOMEM));
301 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
303 cfs_write_lock(&obd_dev_lock);
304 for (i = 0; i < class_devno_max(); i++) {
305 struct obd_device *obd = class_num2obd(i);
307 if (obd && obd->obd_name &&
308 (strcmp(name, obd->obd_name) == 0)) {
309 CERROR("Device %s already exists, won't add\n", name);
311 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
312 "%p obd_magic %08x != %08x\n", result,
313 result->obd_magic, OBD_DEVICE_MAGIC);
314 LASSERTF(result->obd_minor == new_obd_minor,
315 "%p obd_minor %d != %d\n", result,
316 result->obd_minor, new_obd_minor);
318 obd_devs[result->obd_minor] = NULL;
319 result->obd_name[0]='\0';
321 result = ERR_PTR(-EEXIST);
324 if (!result && !obd) {
326 result->obd_minor = i;
328 result->obd_type = type;
329 strncpy(result->obd_name, name,
330 sizeof(result->obd_name) - 1);
331 obd_devs[i] = result;
334 cfs_write_unlock(&obd_dev_lock);
336 if (result == NULL && i >= class_devno_max()) {
337 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339 result = ERR_PTR(-EOVERFLOW);
342 if (IS_ERR(result)) {
343 obd_device_free(newdev);
344 class_put_type(type);
346 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
347 result->obd_name, result);
352 void class_release_dev(struct obd_device *obd)
354 struct obd_type *obd_type = obd->obd_type;
356 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
357 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
358 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
359 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
360 LASSERT(obd_type != NULL);
362 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
363 obd->obd_name,obd->obd_type->typ_name);
365 cfs_write_lock(&obd_dev_lock);
366 obd_devs[obd->obd_minor] = NULL;
367 cfs_write_unlock(&obd_dev_lock);
368 obd_device_free(obd);
370 class_put_type(obd_type);
373 int class_name2dev(const char *name)
380 cfs_read_lock(&obd_dev_lock);
381 for (i = 0; i < class_devno_max(); i++) {
382 struct obd_device *obd = class_num2obd(i);
384 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
385 /* Make sure we finished attaching before we give
386 out any references */
387 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
388 if (obd->obd_attached) {
389 cfs_read_unlock(&obd_dev_lock);
395 cfs_read_unlock(&obd_dev_lock);
399 EXPORT_SYMBOL(class_name2dev);
401 struct obd_device *class_name2obd(const char *name)
403 int dev = class_name2dev(name);
405 if (dev < 0 || dev > class_devno_max())
407 return class_num2obd(dev);
409 EXPORT_SYMBOL(class_name2obd);
411 int class_uuid2dev(struct obd_uuid *uuid)
415 cfs_read_lock(&obd_dev_lock);
416 for (i = 0; i < class_devno_max(); i++) {
417 struct obd_device *obd = class_num2obd(i);
419 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
420 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
421 cfs_read_unlock(&obd_dev_lock);
425 cfs_read_unlock(&obd_dev_lock);
429 EXPORT_SYMBOL(class_uuid2dev);
431 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
433 int dev = class_uuid2dev(uuid);
436 return class_num2obd(dev);
438 EXPORT_SYMBOL(class_uuid2obd);
441 * Get obd device from ::obd_devs[]
443 * \param num [in] array index
445 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
446 * otherwise return the obd device there.
448 struct obd_device *class_num2obd(int num)
450 struct obd_device *obd = NULL;
452 if (num < class_devno_max()) {
457 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
458 "%p obd_magic %08x != %08x\n",
459 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
460 LASSERTF(obd->obd_minor == num,
461 "%p obd_minor %0d != %0d\n",
462 obd, obd->obd_minor, num);
467 EXPORT_SYMBOL(class_num2obd);
469 void class_obd_list(void)
474 cfs_read_lock(&obd_dev_lock);
475 for (i = 0; i < class_devno_max(); i++) {
476 struct obd_device *obd = class_num2obd(i);
480 if (obd->obd_stopping)
482 else if (obd->obd_set_up)
484 else if (obd->obd_attached)
488 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
489 i, status, obd->obd_type->typ_name,
490 obd->obd_name, obd->obd_uuid.uuid,
491 cfs_atomic_read(&obd->obd_refcount));
493 cfs_read_unlock(&obd_dev_lock);
497 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
498 specified, then only the client with that uuid is returned,
499 otherwise any client connected to the tgt is returned. */
500 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
501 const char * typ_name,
502 struct obd_uuid *grp_uuid)
506 cfs_read_lock(&obd_dev_lock);
507 for (i = 0; i < class_devno_max(); i++) {
508 struct obd_device *obd = class_num2obd(i);
512 if ((strncmp(obd->obd_type->typ_name, typ_name,
513 strlen(typ_name)) == 0)) {
514 if (obd_uuid_equals(tgt_uuid,
515 &obd->u.cli.cl_target_uuid) &&
516 ((grp_uuid)? obd_uuid_equals(grp_uuid,
517 &obd->obd_uuid) : 1)) {
518 cfs_read_unlock(&obd_dev_lock);
523 cfs_read_unlock(&obd_dev_lock);
527 EXPORT_SYMBOL(class_find_client_obd);
529 /* Iterate the obd_device list looking devices have grp_uuid. Start
530 searching at *next, and if a device is found, the next index to look
531 at is saved in *next. If next is NULL, then the first matching device
532 will always be returned. */
533 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
539 else if (*next >= 0 && *next < class_devno_max())
544 cfs_read_lock(&obd_dev_lock);
545 for (; i < class_devno_max(); i++) {
546 struct obd_device *obd = class_num2obd(i);
550 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
553 cfs_read_unlock(&obd_dev_lock);
557 cfs_read_unlock(&obd_dev_lock);
561 EXPORT_SYMBOL(class_devices_in_group);
564 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
565 * adjust sptlrpc settings accordingly.
567 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
569 struct obd_device *obd;
573 LASSERT(namelen > 0);
575 cfs_read_lock(&obd_dev_lock);
576 for (i = 0; i < class_devno_max(); i++) {
577 obd = class_num2obd(i);
579 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
582 /* only notify mdc, osc, mdt, ost */
583 type = obd->obd_type->typ_name;
584 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
585 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
586 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
587 strcmp(type, LUSTRE_OST_NAME) != 0)
590 if (strncmp(obd->obd_name, fsname, namelen))
593 class_incref(obd, __FUNCTION__, obd);
594 cfs_read_unlock(&obd_dev_lock);
595 rc2 = obd_set_info_async(obd->obd_self_export,
596 sizeof(KEY_SPTLRPC_CONF),
597 KEY_SPTLRPC_CONF, 0, NULL, NULL);
599 class_decref(obd, __FUNCTION__, obd);
600 cfs_read_lock(&obd_dev_lock);
602 cfs_read_unlock(&obd_dev_lock);
605 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
607 void obd_cleanup_caches(void)
612 if (obd_device_cachep) {
613 rc = cfs_mem_cache_destroy(obd_device_cachep);
614 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
615 obd_device_cachep = NULL;
618 rc = cfs_mem_cache_destroy(obdo_cachep);
619 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
623 rc = cfs_mem_cache_destroy(import_cachep);
624 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
625 import_cachep = NULL;
628 rc = cfs_mem_cache_destroy(capa_cachep);
629 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
635 int obd_init_caches(void)
639 LASSERT(obd_device_cachep == NULL);
640 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
641 sizeof(struct obd_device),
643 if (!obd_device_cachep)
646 LASSERT(obdo_cachep == NULL);
647 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
652 LASSERT(import_cachep == NULL);
653 import_cachep = cfs_mem_cache_create("ll_import_cache",
654 sizeof(struct obd_import),
659 LASSERT(capa_cachep == NULL);
660 capa_cachep = cfs_mem_cache_create("capa_cache",
661 sizeof(struct obd_capa), 0, 0);
667 obd_cleanup_caches();
672 /* map connection to client */
673 struct obd_export *class_conn2export(struct lustre_handle *conn)
675 struct obd_export *export;
679 CDEBUG(D_CACHE, "looking for null handle\n");
683 if (conn->cookie == -1) { /* this means assign a new connection */
684 CDEBUG(D_CACHE, "want a new connection\n");
688 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
689 export = class_handle2object(conn->cookie);
692 EXPORT_SYMBOL(class_conn2export);
694 struct obd_device *class_exp2obd(struct obd_export *exp)
700 EXPORT_SYMBOL(class_exp2obd);
702 struct obd_device *class_conn2obd(struct lustre_handle *conn)
704 struct obd_export *export;
705 export = class_conn2export(conn);
707 struct obd_device *obd = export->exp_obd;
708 class_export_put(export);
713 EXPORT_SYMBOL(class_conn2obd);
715 struct obd_import *class_exp2cliimp(struct obd_export *exp)
717 struct obd_device *obd = exp->exp_obd;
720 return obd->u.cli.cl_import;
722 EXPORT_SYMBOL(class_exp2cliimp);
724 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
726 struct obd_device *obd = class_conn2obd(conn);
729 return obd->u.cli.cl_import;
731 EXPORT_SYMBOL(class_conn2cliimp);
733 /* Export management functions */
734 static void class_export_destroy(struct obd_export *exp)
736 struct obd_device *obd = exp->exp_obd;
739 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
741 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
742 exp->exp_client_uuid.uuid, obd->obd_name);
744 LASSERT(obd != NULL);
746 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
747 if (exp->exp_connection)
748 ptlrpc_put_connection_superhack(exp->exp_connection);
750 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
751 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
752 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
753 LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
754 obd_destroy_export(exp);
755 class_decref(obd, "export", exp);
757 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
761 static void export_handle_addref(void *export)
763 class_export_get(export);
766 struct obd_export *class_export_get(struct obd_export *exp)
768 cfs_atomic_inc(&exp->exp_refcount);
769 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
770 cfs_atomic_read(&exp->exp_refcount));
773 EXPORT_SYMBOL(class_export_get);
775 void class_export_put(struct obd_export *exp)
777 LASSERT(exp != NULL);
778 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, 0x5a5a5a);
779 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
780 cfs_atomic_read(&exp->exp_refcount) - 1);
782 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
783 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
784 CDEBUG(D_IOCTL, "final put %p/%s\n",
785 exp, exp->exp_client_uuid.uuid);
787 /* release nid stat refererence */
788 lprocfs_exp_cleanup(exp);
790 obd_zombie_export_add(exp);
793 EXPORT_SYMBOL(class_export_put);
795 /* Creates a new export, adds it to the hash table, and returns a
796 * pointer to it. The refcount is 2: one for the hash reference, and
797 * one for the pointer returned by this function. */
798 struct obd_export *class_new_export(struct obd_device *obd,
799 struct obd_uuid *cluuid)
801 struct obd_export *export;
802 cfs_hash_t *hash = NULL;
806 OBD_ALLOC_PTR(export);
808 return ERR_PTR(-ENOMEM);
810 export->exp_conn_cnt = 0;
811 export->exp_lock_hash = NULL;
812 cfs_atomic_set(&export->exp_refcount, 2);
813 cfs_atomic_set(&export->exp_rpc_count, 0);
814 cfs_atomic_set(&export->exp_cb_count, 0);
815 cfs_atomic_set(&export->exp_locks_count, 0);
816 #if LUSTRE_TRACKS_LOCK_EXP_REFS
817 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
818 cfs_spin_lock_init(&export->exp_locks_list_guard);
820 cfs_atomic_set(&export->exp_replay_count, 0);
821 export->exp_obd = obd;
822 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
823 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
824 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
825 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
826 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
827 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
828 class_handle_hash(&export->exp_handle, export_handle_addref);
829 export->exp_last_request_time = cfs_time_current_sec();
830 cfs_spin_lock_init(&export->exp_lock);
831 cfs_spin_lock_init(&export->exp_rpc_lock);
832 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
833 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
835 export->exp_sp_peer = LUSTRE_SP_ANY;
836 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
837 export->exp_client_uuid = *cluuid;
838 obd_init_export(export);
840 cfs_spin_lock(&obd->obd_dev_lock);
841 /* shouldn't happen, but might race */
842 if (obd->obd_stopping)
843 GOTO(exit_unlock, rc = -ENODEV);
845 hash = cfs_hash_getref(obd->obd_uuid_hash);
847 GOTO(exit_unlock, rc = -ENODEV);
848 cfs_spin_unlock(&obd->obd_dev_lock);
850 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
851 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
853 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
854 obd->obd_name, cluuid->uuid, rc);
855 GOTO(exit_err, rc = -EALREADY);
859 cfs_spin_lock(&obd->obd_dev_lock);
860 if (obd->obd_stopping) {
861 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
862 GOTO(exit_unlock, rc = -ENODEV);
865 class_incref(obd, "export", export);
866 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
867 cfs_list_add_tail(&export->exp_obd_chain_timed,
868 &export->exp_obd->obd_exports_timed);
869 export->exp_obd->obd_num_exports++;
870 cfs_spin_unlock(&obd->obd_dev_lock);
871 cfs_hash_putref(hash);
875 cfs_spin_unlock(&obd->obd_dev_lock);
878 cfs_hash_putref(hash);
879 class_handle_unhash(&export->exp_handle);
880 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
881 obd_destroy_export(export);
882 OBD_FREE_PTR(export);
885 EXPORT_SYMBOL(class_new_export);
887 void class_unlink_export(struct obd_export *exp)
889 class_handle_unhash(&exp->exp_handle);
891 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
892 /* delete an uuid-export hashitem from hashtables */
893 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
894 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
895 &exp->exp_client_uuid,
896 &exp->exp_uuid_hash);
898 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
899 cfs_list_del_init(&exp->exp_obd_chain_timed);
900 exp->exp_obd->obd_num_exports--;
901 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
902 class_export_put(exp);
904 EXPORT_SYMBOL(class_unlink_export);
906 /* Import management functions */
907 void class_import_destroy(struct obd_import *imp)
911 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
912 imp->imp_obd->obd_name);
914 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
916 ptlrpc_put_connection_superhack(imp->imp_connection);
918 while (!cfs_list_empty(&imp->imp_conn_list)) {
919 struct obd_import_conn *imp_conn;
921 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
922 struct obd_import_conn, oic_item);
923 cfs_list_del_init(&imp_conn->oic_item);
924 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
925 OBD_FREE(imp_conn, sizeof(*imp_conn));
928 LASSERT(imp->imp_sec == NULL);
929 class_decref(imp->imp_obd, "import", imp);
930 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
934 static void import_handle_addref(void *import)
936 class_import_get(import);
939 struct obd_import *class_import_get(struct obd_import *import)
941 cfs_atomic_inc(&import->imp_refcount);
942 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
943 cfs_atomic_read(&import->imp_refcount),
944 import->imp_obd->obd_name);
947 EXPORT_SYMBOL(class_import_get);
949 void class_import_put(struct obd_import *imp)
953 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
954 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, 0x5a5a5a);
956 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
957 cfs_atomic_read(&imp->imp_refcount) - 1,
958 imp->imp_obd->obd_name);
960 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
961 CDEBUG(D_INFO, "final put import %p\n", imp);
962 obd_zombie_import_add(imp);
967 EXPORT_SYMBOL(class_import_put);
969 static void init_imp_at(struct imp_at *at) {
971 at_init(&at->iat_net_latency, 0, 0);
972 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
973 /* max service estimates are tracked on the server side, so
974 don't use the AT history here, just use the last reported
975 val. (But keep hist for proc histogram, worst_ever) */
976 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
981 struct obd_import *class_new_import(struct obd_device *obd)
983 struct obd_import *imp;
985 OBD_ALLOC(imp, sizeof(*imp));
989 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
990 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
991 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
992 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
993 cfs_spin_lock_init(&imp->imp_lock);
994 imp->imp_last_success_conn = 0;
995 imp->imp_state = LUSTRE_IMP_NEW;
996 imp->imp_obd = class_incref(obd, "import", imp);
997 cfs_sema_init(&imp->imp_sec_mutex, 1);
998 cfs_waitq_init(&imp->imp_recovery_waitq);
1000 cfs_atomic_set(&imp->imp_refcount, 2);
1001 cfs_atomic_set(&imp->imp_unregistering, 0);
1002 cfs_atomic_set(&imp->imp_inflight, 0);
1003 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1004 cfs_atomic_set(&imp->imp_inval_count, 0);
1005 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1006 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1007 class_handle_hash(&imp->imp_handle, import_handle_addref);
1008 init_imp_at(&imp->imp_at);
1010 /* the default magic is V2, will be used in connect RPC, and
1011 * then adjusted according to the flags in request/reply. */
1012 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1016 EXPORT_SYMBOL(class_new_import);
1018 void class_destroy_import(struct obd_import *import)
1020 LASSERT(import != NULL);
1021 LASSERT(import != LP_POISON);
1023 class_handle_unhash(&import->imp_handle);
1025 cfs_spin_lock(&import->imp_lock);
1026 import->imp_generation++;
1027 cfs_spin_unlock(&import->imp_lock);
1028 class_import_put(import);
1030 EXPORT_SYMBOL(class_destroy_import);
1032 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1034 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1036 cfs_spin_lock(&exp->exp_locks_list_guard);
1038 LASSERT(lock->l_exp_refs_nr >= 0);
1040 if (lock->l_exp_refs_target != NULL &&
1041 lock->l_exp_refs_target != exp) {
1042 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1043 exp, lock, lock->l_exp_refs_target);
1045 if ((lock->l_exp_refs_nr ++) == 0) {
1046 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1047 lock->l_exp_refs_target = exp;
1049 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1050 lock, exp, lock->l_exp_refs_nr);
1051 cfs_spin_unlock(&exp->exp_locks_list_guard);
1053 EXPORT_SYMBOL(__class_export_add_lock_ref);
1055 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1057 cfs_spin_lock(&exp->exp_locks_list_guard);
1058 LASSERT(lock->l_exp_refs_nr > 0);
1059 if (lock->l_exp_refs_target != exp) {
1060 LCONSOLE_WARN("lock %p, "
1061 "mismatching export pointers: %p, %p\n",
1062 lock, lock->l_exp_refs_target, exp);
1064 if (-- lock->l_exp_refs_nr == 0) {
1065 cfs_list_del_init(&lock->l_exp_refs_link);
1066 lock->l_exp_refs_target = NULL;
1068 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1069 lock, exp, lock->l_exp_refs_nr);
1070 cfs_spin_unlock(&exp->exp_locks_list_guard);
1072 EXPORT_SYMBOL(__class_export_del_lock_ref);
1075 /* A connection defines an export context in which preallocation can
1076 be managed. This releases the export pointer reference, and returns
1077 the export handle, so the export refcount is 1 when this function
1079 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1080 struct obd_uuid *cluuid)
1082 struct obd_export *export;
1083 LASSERT(conn != NULL);
1084 LASSERT(obd != NULL);
1085 LASSERT(cluuid != NULL);
1088 export = class_new_export(obd, cluuid);
1090 RETURN(PTR_ERR(export));
1092 conn->cookie = export->exp_handle.h_cookie;
1093 class_export_put(export);
1095 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1096 cluuid->uuid, conn->cookie);
1099 EXPORT_SYMBOL(class_connect);
1101 /* if export is involved in recovery then clean up related things */
1102 void class_export_recovery_cleanup(struct obd_export *exp)
1104 struct obd_device *obd = exp->exp_obd;
1106 cfs_spin_lock(&obd->obd_recovery_task_lock);
1107 if (exp->exp_delayed)
1108 obd->obd_delayed_clients--;
1109 if (obd->obd_recovering && exp->exp_in_recovery) {
1110 cfs_spin_lock(&exp->exp_lock);
1111 exp->exp_in_recovery = 0;
1112 cfs_spin_unlock(&exp->exp_lock);
1113 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1114 cfs_atomic_dec(&obd->obd_connected_clients);
1116 cfs_spin_unlock(&obd->obd_recovery_task_lock);
1117 /** Cleanup req replay fields */
1118 if (exp->exp_req_replay_needed) {
1119 cfs_spin_lock(&exp->exp_lock);
1120 exp->exp_req_replay_needed = 0;
1121 cfs_spin_unlock(&exp->exp_lock);
1122 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1123 cfs_atomic_dec(&obd->obd_req_replay_clients);
1125 /** Cleanup lock replay data */
1126 if (exp->exp_lock_replay_needed) {
1127 cfs_spin_lock(&exp->exp_lock);
1128 exp->exp_lock_replay_needed = 0;
1129 cfs_spin_unlock(&exp->exp_lock);
1130 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1131 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1135 /* This function removes 1-3 references from the export:
1136 * 1 - for export pointer passed
1137 * and if disconnect really need
1138 * 2 - removing from hash
1139 * 3 - in client_unlink_export
1140 * The export pointer passed to this function can destroyed */
1141 int class_disconnect(struct obd_export *export)
1143 int already_disconnected;
1146 if (export == NULL) {
1147 CWARN("attempting to free NULL export %p\n", export);
1151 cfs_spin_lock(&export->exp_lock);
1152 already_disconnected = export->exp_disconnected;
1153 export->exp_disconnected = 1;
1154 cfs_spin_unlock(&export->exp_lock);
1156 /* class_cleanup(), abort_recovery(), and class_fail_export()
1157 * all end up in here, and if any of them race we shouldn't
1158 * call extra class_export_puts(). */
1159 if (already_disconnected) {
1160 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1161 GOTO(no_disconn, already_disconnected);
1164 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1165 export->exp_handle.h_cookie);
1167 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1168 cfs_hash_del(export->exp_obd->obd_nid_hash,
1169 &export->exp_connection->c_peer.nid,
1170 &export->exp_nid_hash);
1172 class_export_recovery_cleanup(export);
1173 class_unlink_export(export);
1175 class_export_put(export);
1178 EXPORT_SYMBOL(class_disconnect);
1180 /* Return non-zero for a fully connected export */
1181 int class_connected_export(struct obd_export *exp)
1185 cfs_spin_lock(&exp->exp_lock);
1186 connected = (exp->exp_conn_cnt > 0);
1187 cfs_spin_unlock(&exp->exp_lock);
1192 EXPORT_SYMBOL(class_connected_export);
1194 static void class_disconnect_export_list(cfs_list_t *list,
1195 enum obd_option flags)
1198 struct obd_export *exp;
1201 /* It's possible that an export may disconnect itself, but
1202 * nothing else will be added to this list. */
1203 while (!cfs_list_empty(list)) {
1204 exp = cfs_list_entry(list->next, struct obd_export,
1206 /* need for safe call CDEBUG after obd_disconnect */
1207 class_export_get(exp);
1209 cfs_spin_lock(&exp->exp_lock);
1210 exp->exp_flags = flags;
1211 cfs_spin_unlock(&exp->exp_lock);
1213 if (obd_uuid_equals(&exp->exp_client_uuid,
1214 &exp->exp_obd->obd_uuid)) {
1216 "exp %p export uuid == obd uuid, don't discon\n",
1218 /* Need to delete this now so we don't end up pointing
1219 * to work_list later when this export is cleaned up. */
1220 cfs_list_del_init(&exp->exp_obd_chain);
1221 class_export_put(exp);
1225 class_export_get(exp);
1226 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1227 "last request at "CFS_TIME_T"\n",
1228 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1229 exp, exp->exp_last_request_time);
1230 /* release one export reference anyway */
1231 rc = obd_disconnect(exp);
1233 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1234 obd_export_nid2str(exp), exp, rc);
1235 class_export_put(exp);
1240 void class_disconnect_exports(struct obd_device *obd)
1242 cfs_list_t work_list;
1245 /* Move all of the exports from obd_exports to a work list, en masse. */
1246 CFS_INIT_LIST_HEAD(&work_list);
1247 cfs_spin_lock(&obd->obd_dev_lock);
1248 cfs_list_splice_init(&obd->obd_exports, &work_list);
1249 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1250 cfs_spin_unlock(&obd->obd_dev_lock);
1252 if (!cfs_list_empty(&work_list)) {
1253 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1254 "disconnecting them\n", obd->obd_minor, obd);
1255 class_disconnect_export_list(&work_list,
1256 exp_flags_from_obd(obd));
1258 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1259 obd->obd_minor, obd);
1262 EXPORT_SYMBOL(class_disconnect_exports);
1264 /* Remove exports that have not completed recovery.
1266 void class_disconnect_stale_exports(struct obd_device *obd,
1267 int (*test_export)(struct obd_export *))
1269 cfs_list_t work_list;
1270 cfs_list_t *pos, *n;
1271 struct obd_export *exp;
1275 CFS_INIT_LIST_HEAD(&work_list);
1276 cfs_spin_lock(&obd->obd_dev_lock);
1277 cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1278 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1279 if (test_export(exp))
1282 /* don't count self-export as client */
1283 if (obd_uuid_equals(&exp->exp_client_uuid,
1284 &exp->exp_obd->obd_uuid))
1287 cfs_list_move(&exp->exp_obd_chain, &work_list);
1289 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1290 obd->obd_name, exp->exp_client_uuid.uuid,
1291 exp->exp_connection == NULL ? "<unknown>" :
1292 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1293 print_export_data(exp, "EVICTING", 0);
1295 cfs_spin_unlock(&obd->obd_dev_lock);
1298 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1299 obd->obd_name, evicted);
1300 obd->obd_stale_clients += evicted;
1302 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1303 OBD_OPT_ABORT_RECOV);
1306 EXPORT_SYMBOL(class_disconnect_stale_exports);
1308 void class_fail_export(struct obd_export *exp)
1310 int rc, already_failed;
1312 cfs_spin_lock(&exp->exp_lock);
1313 already_failed = exp->exp_failed;
1314 exp->exp_failed = 1;
1315 cfs_spin_unlock(&exp->exp_lock);
1317 if (already_failed) {
1318 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1319 exp, exp->exp_client_uuid.uuid);
1323 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1324 exp, exp->exp_client_uuid.uuid);
1326 if (obd_dump_on_timeout)
1327 libcfs_debug_dumplog();
1329 /* Most callers into obd_disconnect are removing their own reference
1330 * (request, for example) in addition to the one from the hash table.
1331 * We don't have such a reference here, so make one. */
1332 class_export_get(exp);
1333 rc = obd_disconnect(exp);
1335 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1337 CDEBUG(D_HA, "disconnected export %p/%s\n",
1338 exp, exp->exp_client_uuid.uuid);
1340 EXPORT_SYMBOL(class_fail_export);
1342 char *obd_export_nid2str(struct obd_export *exp)
1344 if (exp->exp_connection != NULL)
1345 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1349 EXPORT_SYMBOL(obd_export_nid2str);
1351 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1353 struct obd_export *doomed_exp = NULL;
1354 int exports_evicted = 0;
1356 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1359 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1360 if (doomed_exp == NULL)
1363 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1364 "nid %s found, wanted nid %s, requested nid %s\n",
1365 obd_export_nid2str(doomed_exp),
1366 libcfs_nid2str(nid_key), nid);
1367 LASSERTF(doomed_exp != obd->obd_self_export,
1368 "self-export is hashed by NID?\n");
1370 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1371 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1373 class_fail_export(doomed_exp);
1374 class_export_put(doomed_exp);
1377 if (!exports_evicted)
1378 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1379 obd->obd_name, nid);
1380 return exports_evicted;
1382 EXPORT_SYMBOL(obd_export_evict_by_nid);
1384 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1386 struct obd_export *doomed_exp = NULL;
1387 struct obd_uuid doomed_uuid;
1388 int exports_evicted = 0;
1390 obd_str2uuid(&doomed_uuid, uuid);
1391 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1392 CERROR("%s: can't evict myself\n", obd->obd_name);
1393 return exports_evicted;
1396 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1398 if (doomed_exp == NULL) {
1399 CERROR("%s: can't disconnect %s: no exports found\n",
1400 obd->obd_name, uuid);
1402 CWARN("%s: evicting %s at adminstrative request\n",
1403 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1404 class_fail_export(doomed_exp);
1405 class_export_put(doomed_exp);
1409 return exports_evicted;
1411 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1413 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1414 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1415 EXPORT_SYMBOL(class_export_dump_hook);
1418 static void print_export_data(struct obd_export *exp, const char *status,
1421 struct ptlrpc_reply_state *rs;
1422 struct ptlrpc_reply_state *first_reply = NULL;
1425 cfs_spin_lock(&exp->exp_lock);
1426 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1432 cfs_spin_unlock(&exp->exp_lock);
1434 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1435 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1436 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1437 cfs_atomic_read(&exp->exp_rpc_count),
1438 cfs_atomic_read(&exp->exp_cb_count),
1439 cfs_atomic_read(&exp->exp_locks_count),
1440 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1441 nreplies, first_reply, nreplies > 3 ? "..." : "",
1442 exp->exp_last_committed);
1443 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1444 if (locks && class_export_dump_hook != NULL)
1445 class_export_dump_hook(exp);
1449 void dump_exports(struct obd_device *obd, int locks)
1451 struct obd_export *exp;
1453 cfs_spin_lock(&obd->obd_dev_lock);
1454 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1455 print_export_data(exp, "ACTIVE", locks);
1456 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1457 print_export_data(exp, "UNLINKED", locks);
1458 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1459 print_export_data(exp, "DELAYED", locks);
1460 cfs_spin_unlock(&obd->obd_dev_lock);
1461 cfs_spin_lock(&obd_zombie_impexp_lock);
1462 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1463 print_export_data(exp, "ZOMBIE", locks);
1464 cfs_spin_unlock(&obd_zombie_impexp_lock);
1466 EXPORT_SYMBOL(dump_exports);
1468 void obd_exports_barrier(struct obd_device *obd)
1471 LASSERT(cfs_list_empty(&obd->obd_exports));
1472 cfs_spin_lock(&obd->obd_dev_lock);
1473 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1474 cfs_spin_unlock(&obd->obd_dev_lock);
1475 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1476 cfs_time_seconds(waited));
1477 if (waited > 5 && IS_PO2(waited)) {
1478 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1479 "more than %d seconds. "
1480 "The obd refcount = %d. Is it stuck?\n",
1481 obd->obd_name, waited,
1482 cfs_atomic_read(&obd->obd_refcount));
1483 dump_exports(obd, 1);
1486 cfs_spin_lock(&obd->obd_dev_lock);
1488 cfs_spin_unlock(&obd->obd_dev_lock);
1490 EXPORT_SYMBOL(obd_exports_barrier);
1492 /* Total amount of zombies to be destroyed */
1493 static int zombies_count = 0;
1496 * kill zombie imports and exports
1498 void obd_zombie_impexp_cull(void)
1500 struct obd_import *import;
1501 struct obd_export *export;
1505 cfs_spin_lock(&obd_zombie_impexp_lock);
1508 if (!cfs_list_empty(&obd_zombie_imports)) {
1509 import = cfs_list_entry(obd_zombie_imports.next,
1512 cfs_list_del_init(&import->imp_zombie_chain);
1516 if (!cfs_list_empty(&obd_zombie_exports)) {
1517 export = cfs_list_entry(obd_zombie_exports.next,
1520 cfs_list_del_init(&export->exp_obd_chain);
1523 cfs_spin_unlock(&obd_zombie_impexp_lock);
1525 if (import != NULL) {
1526 class_import_destroy(import);
1527 cfs_spin_lock(&obd_zombie_impexp_lock);
1529 cfs_spin_unlock(&obd_zombie_impexp_lock);
1532 if (export != NULL) {
1533 class_export_destroy(export);
1534 cfs_spin_lock(&obd_zombie_impexp_lock);
1536 cfs_spin_unlock(&obd_zombie_impexp_lock);
1540 } while (import != NULL || export != NULL);
1544 static cfs_completion_t obd_zombie_start;
1545 static cfs_completion_t obd_zombie_stop;
1546 static unsigned long obd_zombie_flags;
1547 static cfs_waitq_t obd_zombie_waitq;
1548 static pid_t obd_zombie_pid;
1551 OBD_ZOMBIE_STOP = 1 << 1
1555 * check for work for kill zombie import/export thread.
1557 static int obd_zombie_impexp_check(void *arg)
1561 cfs_spin_lock(&obd_zombie_impexp_lock);
1562 rc = (zombies_count == 0) &&
1563 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1564 cfs_spin_unlock(&obd_zombie_impexp_lock);
1570 * Add export to the obd_zombe thread and notify it.
1572 static void obd_zombie_export_add(struct obd_export *exp) {
1573 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1574 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1575 cfs_list_del_init(&exp->exp_obd_chain);
1576 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1577 cfs_spin_lock(&obd_zombie_impexp_lock);
1579 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1580 cfs_spin_unlock(&obd_zombie_impexp_lock);
1582 if (obd_zombie_impexp_notify != NULL)
1583 obd_zombie_impexp_notify();
1587 * Add import to the obd_zombe thread and notify it.
1589 static void obd_zombie_import_add(struct obd_import *imp) {
1590 LASSERT(imp->imp_sec == NULL);
1591 cfs_spin_lock(&obd_zombie_impexp_lock);
1592 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1594 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1595 cfs_spin_unlock(&obd_zombie_impexp_lock);
1597 if (obd_zombie_impexp_notify != NULL)
1598 obd_zombie_impexp_notify();
1602 * notify import/export destroy thread about new zombie.
1604 static void obd_zombie_impexp_notify(void)
1607 * Make sure obd_zomebie_impexp_thread get this notification.
1608 * It is possible this signal only get by obd_zombie_barrier, and
1609 * barrier gulps this notification and sleeps away and hangs ensues
1611 cfs_waitq_broadcast(&obd_zombie_waitq);
1615 * check whether obd_zombie is idle
1617 static int obd_zombie_is_idle(void)
1621 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1622 cfs_spin_lock(&obd_zombie_impexp_lock);
1623 rc = (zombies_count == 0);
1624 cfs_spin_unlock(&obd_zombie_impexp_lock);
1629 * wait when obd_zombie import/export queues become empty
1631 void obd_zombie_barrier(void)
1633 struct l_wait_info lwi = { 0 };
1635 if (obd_zombie_pid == cfs_curproc_pid())
1636 /* don't wait for myself */
1638 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1640 EXPORT_SYMBOL(obd_zombie_barrier);
1645 * destroy zombie export/import thread.
1647 static int obd_zombie_impexp_thread(void *unused)
1651 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1652 cfs_complete(&obd_zombie_start);
1656 cfs_complete(&obd_zombie_start);
1658 obd_zombie_pid = cfs_curproc_pid();
1660 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1661 struct l_wait_info lwi = { 0 };
1663 l_wait_event(obd_zombie_waitq,
1664 !obd_zombie_impexp_check(NULL), &lwi);
1665 obd_zombie_impexp_cull();
1668 * Notify obd_zombie_barrier callers that queues
1671 cfs_waitq_signal(&obd_zombie_waitq);
1674 cfs_complete(&obd_zombie_stop);
1679 #else /* ! KERNEL */
1681 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1682 static void *obd_zombie_impexp_work_cb;
1683 static void *obd_zombie_impexp_idle_cb;
1685 int obd_zombie_impexp_kill(void *arg)
1689 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1690 obd_zombie_impexp_cull();
1693 cfs_atomic_dec(&zombie_recur);
1700 * start destroy zombie import/export thread
1702 int obd_zombie_impexp_init(void)
1706 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1707 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1708 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1709 cfs_init_completion(&obd_zombie_start);
1710 cfs_init_completion(&obd_zombie_stop);
1711 cfs_waitq_init(&obd_zombie_waitq);
1715 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1719 cfs_wait_for_completion(&obd_zombie_start);
1722 obd_zombie_impexp_work_cb =
1723 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1724 &obd_zombie_impexp_kill, NULL);
1726 obd_zombie_impexp_idle_cb =
1727 liblustre_register_idle_callback("obd_zombi_impexp_check",
1728 &obd_zombie_impexp_check, NULL);
1734 * stop destroy zombie import/export thread
1736 void obd_zombie_impexp_stop(void)
1738 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1739 obd_zombie_impexp_notify();
1741 cfs_wait_for_completion(&obd_zombie_stop);
1743 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1744 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1748 /***** Kernel-userspace comm helpers *******/
1750 /* Get length of entire message, including header */
1751 int kuc_len(int payload_len)
1753 return sizeof(struct kuc_hdr) + payload_len;
1755 EXPORT_SYMBOL(kuc_len);
1757 /* Get a pointer to kuc header, given a ptr to the payload
1758 * @param p Pointer to payload area
1759 * @returns Pointer to kuc header
1761 struct kuc_hdr * kuc_ptr(void *p)
1763 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1764 LASSERT(lh->kuc_magic == KUC_MAGIC);
1767 EXPORT_SYMBOL(kuc_ptr);
1769 /* Test if payload is part of kuc message
1770 * @param p Pointer to payload area
1773 int kuc_ispayload(void *p)
1775 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1777 if (kh->kuc_magic == KUC_MAGIC)
1782 EXPORT_SYMBOL(kuc_ispayload);
1784 /* Alloc space for a message, and fill in header
1785 * @return Pointer to payload area
1787 void *kuc_alloc(int payload_len, int transport, int type)
1790 int len = kuc_len(payload_len);
1794 return ERR_PTR(-ENOMEM);
1796 lh->kuc_magic = KUC_MAGIC;
1797 lh->kuc_transport = transport;
1798 lh->kuc_msgtype = type;
1799 lh->kuc_msglen = len;
1801 return (void *)(lh + 1);
1803 EXPORT_SYMBOL(kuc_alloc);
1805 /* Takes pointer to payload area */
1806 inline void kuc_free(void *p, int payload_len)
1808 struct kuc_hdr *lh = kuc_ptr(p);
1809 OBD_FREE(lh, kuc_len(payload_len));
1811 EXPORT_SYMBOL(kuc_free);