1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lustre/obdclass/genops.c
41 * These are the only exported functions, they provide some generic
42 * infrastructure for managing object devices
45 #define DEBUG_SUBSYSTEM S_CLASS
47 #include <liblustre.h>
50 #include <obd_class.h>
51 #include <lprocfs_status.h>
53 extern cfs_list_t obd_types;
54 cfs_spinlock_t obd_types_lock;
56 cfs_mem_cache_t *obd_device_cachep;
57 cfs_mem_cache_t *obdo_cachep;
58 EXPORT_SYMBOL(obdo_cachep);
59 cfs_mem_cache_t *import_cachep;
61 cfs_list_t obd_zombie_imports;
62 cfs_list_t obd_zombie_exports;
63 cfs_spinlock_t obd_zombie_impexp_lock;
64 static void obd_zombie_impexp_notify(void);
65 static void obd_zombie_export_add(struct obd_export *exp);
66 static void obd_zombie_import_add(struct obd_import *imp);
67 static void print_export_data(struct obd_export *exp,
68 const char *status, int locks);
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
73 * support functions: we could use inter-module communication, but this
74 * is more portable to other OS's
76 static struct obd_device *obd_device_alloc(void)
78 struct obd_device *obd;
80 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
82 obd->obd_magic = OBD_DEVICE_MAGIC;
87 static void obd_device_free(struct obd_device *obd)
90 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
91 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
92 if (obd->obd_namespace != NULL) {
93 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
94 obd, obd->obd_namespace, obd->obd_force);
97 lu_ref_fini(&obd->obd_reference);
98 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
101 struct obd_type *class_search_type(const char *name)
104 struct obd_type *type;
106 cfs_spin_lock(&obd_types_lock);
107 cfs_list_for_each(tmp, &obd_types) {
108 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
109 if (strcmp(type->typ_name, name) == 0) {
110 cfs_spin_unlock(&obd_types_lock);
114 cfs_spin_unlock(&obd_types_lock);
118 struct obd_type *class_get_type(const char *name)
120 struct obd_type *type = class_search_type(name);
122 #ifdef HAVE_MODULE_LOADING_SUPPORT
124 const char *modname = name;
125 if (!cfs_request_module("%s", modname)) {
126 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127 type = class_search_type(name);
129 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135 cfs_spin_lock(&type->obd_type_lock);
137 cfs_try_module_get(type->typ_dt_ops->o_owner);
138 cfs_spin_unlock(&type->obd_type_lock);
142 EXPORT_SYMBOL(class_get_type);
144 void class_put_type(struct obd_type *type)
147 cfs_spin_lock(&type->obd_type_lock);
149 cfs_module_put(type->typ_dt_ops->o_owner);
150 cfs_spin_unlock(&type->obd_type_lock);
152 EXPORT_SYMBOL(class_put_type);
154 #define CLASS_MAX_NAME 1024
156 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
157 struct lprocfs_vars *vars, const char *name,
158 struct lu_device_type *ldt)
160 struct obd_type *type;
165 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
167 if (class_search_type(name)) {
168 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
173 OBD_ALLOC(type, sizeof(*type));
177 OBD_ALLOC_PTR(type->typ_dt_ops);
178 OBD_ALLOC_PTR(type->typ_md_ops);
179 OBD_ALLOC(type->typ_name, strlen(name) + 1);
181 if (type->typ_dt_ops == NULL ||
182 type->typ_md_ops == NULL ||
183 type->typ_name == NULL)
186 *(type->typ_dt_ops) = *dt_ops;
187 /* md_ops is optional */
189 *(type->typ_md_ops) = *md_ops;
190 strcpy(type->typ_name, name);
191 cfs_spin_lock_init(&type->obd_type_lock);
194 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
196 if (IS_ERR(type->typ_procroot)) {
197 rc = PTR_ERR(type->typ_procroot);
198 type->typ_procroot = NULL;
204 rc = lu_device_type_init(ldt);
209 cfs_spin_lock(&obd_types_lock);
210 cfs_list_add(&type->typ_chain, &obd_types);
211 cfs_spin_unlock(&obd_types_lock);
216 if (type->typ_name != NULL)
217 OBD_FREE(type->typ_name, strlen(name) + 1);
218 if (type->typ_md_ops != NULL)
219 OBD_FREE_PTR(type->typ_md_ops);
220 if (type->typ_dt_ops != NULL)
221 OBD_FREE_PTR(type->typ_dt_ops);
222 OBD_FREE(type, sizeof(*type));
225 EXPORT_SYMBOL(class_register_type);
227 int class_unregister_type(const char *name)
229 struct obd_type *type = class_search_type(name);
233 CERROR("unknown obd type\n");
237 if (type->typ_refcnt) {
238 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
239 /* This is a bad situation, let's make the best of it */
240 /* Remove ops, but leave the name for debugging */
241 OBD_FREE_PTR(type->typ_dt_ops);
242 OBD_FREE_PTR(type->typ_md_ops);
246 if (type->typ_procroot) {
247 lprocfs_remove(&type->typ_procroot);
251 lu_device_type_fini(type->typ_lu);
253 cfs_spin_lock(&obd_types_lock);
254 cfs_list_del(&type->typ_chain);
255 cfs_spin_unlock(&obd_types_lock);
256 OBD_FREE(type->typ_name, strlen(name) + 1);
257 if (type->typ_dt_ops != NULL)
258 OBD_FREE_PTR(type->typ_dt_ops);
259 if (type->typ_md_ops != NULL)
260 OBD_FREE_PTR(type->typ_md_ops);
261 OBD_FREE(type, sizeof(*type));
263 } /* class_unregister_type */
264 EXPORT_SYMBOL(class_unregister_type);
267 * Create a new obd device.
269 * Find an empty slot in ::obd_devs[], create a new obd device in it.
271 * \param[in] type_name obd device type string.
272 * \param[in] name obd device name.
274 * \retval NULL if create fails, otherwise return the obd device
277 struct obd_device *class_newdev(const char *type_name, const char *name)
279 struct obd_device *result = NULL;
280 struct obd_device *newdev;
281 struct obd_type *type = NULL;
283 int new_obd_minor = 0;
286 if (strlen(name) >= MAX_OBD_NAME) {
287 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
288 RETURN(ERR_PTR(-EINVAL));
291 type = class_get_type(type_name);
293 CERROR("OBD: unknown type: %s\n", type_name);
294 RETURN(ERR_PTR(-ENODEV));
297 newdev = obd_device_alloc();
298 if (newdev == NULL) {
299 class_put_type(type);
300 RETURN(ERR_PTR(-ENOMEM));
302 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
304 cfs_write_lock(&obd_dev_lock);
305 for (i = 0; i < class_devno_max(); i++) {
306 struct obd_device *obd = class_num2obd(i);
308 if (obd && obd->obd_name &&
309 (strcmp(name, obd->obd_name) == 0)) {
310 CERROR("Device %s already exists at %d, won't add\n",
313 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
314 "%p obd_magic %08x != %08x\n", result,
315 result->obd_magic, OBD_DEVICE_MAGIC);
316 LASSERTF(result->obd_minor == new_obd_minor,
317 "%p obd_minor %d != %d\n", result,
318 result->obd_minor, new_obd_minor);
320 obd_devs[result->obd_minor] = NULL;
321 result->obd_name[0]='\0';
323 result = ERR_PTR(-EEXIST);
326 if (!result && !obd) {
328 result->obd_minor = i;
330 result->obd_type = type;
331 strncpy(result->obd_name, name,
332 sizeof(result->obd_name) - 1);
333 obd_devs[i] = result;
336 cfs_write_unlock(&obd_dev_lock);
338 if (result == NULL && i >= class_devno_max()) {
339 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
341 RETURN(ERR_PTR(-EOVERFLOW));
344 if (IS_ERR(result)) {
345 obd_device_free(newdev);
346 class_put_type(type);
348 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
349 result->obd_name, result);
354 void class_release_dev(struct obd_device *obd)
356 struct obd_type *obd_type = obd->obd_type;
358 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
359 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
360 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
361 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
362 LASSERT(obd_type != NULL);
364 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
365 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
367 cfs_write_lock(&obd_dev_lock);
368 obd_devs[obd->obd_minor] = NULL;
369 cfs_write_unlock(&obd_dev_lock);
370 obd_device_free(obd);
372 class_put_type(obd_type);
375 int class_name2dev(const char *name)
382 cfs_read_lock(&obd_dev_lock);
383 for (i = 0; i < class_devno_max(); i++) {
384 struct obd_device *obd = class_num2obd(i);
386 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
387 /* Make sure we finished attaching before we give
388 out any references */
389 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
390 if (obd->obd_attached) {
391 cfs_read_unlock(&obd_dev_lock);
397 cfs_read_unlock(&obd_dev_lock);
401 EXPORT_SYMBOL(class_name2dev);
403 struct obd_device *class_name2obd(const char *name)
405 int dev = class_name2dev(name);
407 if (dev < 0 || dev > class_devno_max())
409 return class_num2obd(dev);
411 EXPORT_SYMBOL(class_name2obd);
413 int class_uuid2dev(struct obd_uuid *uuid)
417 cfs_read_lock(&obd_dev_lock);
418 for (i = 0; i < class_devno_max(); i++) {
419 struct obd_device *obd = class_num2obd(i);
421 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
422 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
423 cfs_read_unlock(&obd_dev_lock);
427 cfs_read_unlock(&obd_dev_lock);
431 EXPORT_SYMBOL(class_uuid2dev);
433 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
435 int dev = class_uuid2dev(uuid);
438 return class_num2obd(dev);
440 EXPORT_SYMBOL(class_uuid2obd);
443 * Get obd device from ::obd_devs[]
445 * \param num [in] array index
447 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
448 * otherwise return the obd device there.
450 struct obd_device *class_num2obd(int num)
452 struct obd_device *obd = NULL;
454 if (num < class_devno_max()) {
459 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
460 "%p obd_magic %08x != %08x\n",
461 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
462 LASSERTF(obd->obd_minor == num,
463 "%p obd_minor %0d != %0d\n",
464 obd, obd->obd_minor, num);
469 EXPORT_SYMBOL(class_num2obd);
471 void class_obd_list(void)
476 cfs_read_lock(&obd_dev_lock);
477 for (i = 0; i < class_devno_max(); i++) {
478 struct obd_device *obd = class_num2obd(i);
482 if (obd->obd_stopping)
484 else if (obd->obd_set_up)
486 else if (obd->obd_attached)
490 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
491 i, status, obd->obd_type->typ_name,
492 obd->obd_name, obd->obd_uuid.uuid,
493 cfs_atomic_read(&obd->obd_refcount));
495 cfs_read_unlock(&obd_dev_lock);
499 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
500 specified, then only the client with that uuid is returned,
501 otherwise any client connected to the tgt is returned. */
502 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
503 const char * typ_name,
504 struct obd_uuid *grp_uuid)
508 cfs_read_lock(&obd_dev_lock);
509 for (i = 0; i < class_devno_max(); i++) {
510 struct obd_device *obd = class_num2obd(i);
514 if ((strncmp(obd->obd_type->typ_name, typ_name,
515 strlen(typ_name)) == 0)) {
516 if (obd_uuid_equals(tgt_uuid,
517 &obd->u.cli.cl_target_uuid) &&
518 ((grp_uuid)? obd_uuid_equals(grp_uuid,
519 &obd->obd_uuid) : 1)) {
520 cfs_read_unlock(&obd_dev_lock);
525 cfs_read_unlock(&obd_dev_lock);
529 EXPORT_SYMBOL(class_find_client_obd);
531 /* Iterate the obd_device list looking devices have grp_uuid. Start
532 searching at *next, and if a device is found, the next index to look
533 at is saved in *next. If next is NULL, then the first matching device
534 will always be returned. */
535 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
541 else if (*next >= 0 && *next < class_devno_max())
546 cfs_read_lock(&obd_dev_lock);
547 for (; i < class_devno_max(); i++) {
548 struct obd_device *obd = class_num2obd(i);
552 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
555 cfs_read_unlock(&obd_dev_lock);
559 cfs_read_unlock(&obd_dev_lock);
563 EXPORT_SYMBOL(class_devices_in_group);
566 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
567 * adjust sptlrpc settings accordingly.
569 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
571 struct obd_device *obd;
575 LASSERT(namelen > 0);
577 cfs_read_lock(&obd_dev_lock);
578 for (i = 0; i < class_devno_max(); i++) {
579 obd = class_num2obd(i);
581 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
584 /* only notify mdc, osc, mdt, ost */
585 type = obd->obd_type->typ_name;
586 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
587 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
588 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
589 strcmp(type, LUSTRE_OST_NAME) != 0)
592 if (strncmp(obd->obd_name, fsname, namelen))
595 class_incref(obd, __FUNCTION__, obd);
596 cfs_read_unlock(&obd_dev_lock);
597 rc2 = obd_set_info_async(obd->obd_self_export,
598 sizeof(KEY_SPTLRPC_CONF),
599 KEY_SPTLRPC_CONF, 0, NULL, NULL);
601 class_decref(obd, __FUNCTION__, obd);
602 cfs_read_lock(&obd_dev_lock);
604 cfs_read_unlock(&obd_dev_lock);
607 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
609 void obd_cleanup_caches(void)
614 if (obd_device_cachep) {
615 rc = cfs_mem_cache_destroy(obd_device_cachep);
616 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
617 obd_device_cachep = NULL;
620 rc = cfs_mem_cache_destroy(obdo_cachep);
621 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
625 rc = cfs_mem_cache_destroy(import_cachep);
626 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
627 import_cachep = NULL;
630 rc = cfs_mem_cache_destroy(capa_cachep);
631 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
637 int obd_init_caches(void)
641 LASSERT(obd_device_cachep == NULL);
642 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
643 sizeof(struct obd_device),
645 if (!obd_device_cachep)
648 LASSERT(obdo_cachep == NULL);
649 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
654 LASSERT(import_cachep == NULL);
655 import_cachep = cfs_mem_cache_create("ll_import_cache",
656 sizeof(struct obd_import),
661 LASSERT(capa_cachep == NULL);
662 capa_cachep = cfs_mem_cache_create("capa_cache",
663 sizeof(struct obd_capa), 0, 0);
669 obd_cleanup_caches();
674 /* map connection to client */
675 struct obd_export *class_conn2export(struct lustre_handle *conn)
677 struct obd_export *export;
681 CDEBUG(D_CACHE, "looking for null handle\n");
685 if (conn->cookie == -1) { /* this means assign a new connection */
686 CDEBUG(D_CACHE, "want a new connection\n");
690 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
691 export = class_handle2object(conn->cookie);
694 EXPORT_SYMBOL(class_conn2export);
696 struct obd_device *class_exp2obd(struct obd_export *exp)
702 EXPORT_SYMBOL(class_exp2obd);
704 struct obd_device *class_conn2obd(struct lustre_handle *conn)
706 struct obd_export *export;
707 export = class_conn2export(conn);
709 struct obd_device *obd = export->exp_obd;
710 class_export_put(export);
715 EXPORT_SYMBOL(class_conn2obd);
717 struct obd_import *class_exp2cliimp(struct obd_export *exp)
719 struct obd_device *obd = exp->exp_obd;
722 return obd->u.cli.cl_import;
724 EXPORT_SYMBOL(class_exp2cliimp);
726 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
728 struct obd_device *obd = class_conn2obd(conn);
731 return obd->u.cli.cl_import;
733 EXPORT_SYMBOL(class_conn2cliimp);
735 /* Export management functions */
736 static void class_export_destroy(struct obd_export *exp)
738 struct obd_device *obd = exp->exp_obd;
741 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
743 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
744 exp->exp_client_uuid.uuid, obd->obd_name);
746 LASSERT(obd != NULL);
748 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
749 if (exp->exp_connection)
750 ptlrpc_put_connection_superhack(exp->exp_connection);
752 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
753 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
754 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
755 LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
756 obd_destroy_export(exp);
757 class_decref(obd, "export", exp);
759 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
763 static void export_handle_addref(void *export)
765 class_export_get(export);
768 struct obd_export *class_export_get(struct obd_export *exp)
770 cfs_atomic_inc(&exp->exp_refcount);
771 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
772 cfs_atomic_read(&exp->exp_refcount));
775 EXPORT_SYMBOL(class_export_get);
777 void class_export_put(struct obd_export *exp)
779 LASSERT(exp != NULL);
780 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, 0x5a5a5a);
781 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
782 cfs_atomic_read(&exp->exp_refcount) - 1);
784 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
785 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
786 CDEBUG(D_IOCTL, "final put %p/%s\n",
787 exp, exp->exp_client_uuid.uuid);
789 /* release nid stat refererence */
790 lprocfs_exp_cleanup(exp);
792 obd_zombie_export_add(exp);
795 EXPORT_SYMBOL(class_export_put);
797 /* Creates a new export, adds it to the hash table, and returns a
798 * pointer to it. The refcount is 2: one for the hash reference, and
799 * one for the pointer returned by this function. */
800 struct obd_export *class_new_export(struct obd_device *obd,
801 struct obd_uuid *cluuid)
803 struct obd_export *export;
804 cfs_hash_t *hash = NULL;
808 OBD_ALLOC_PTR(export);
810 return ERR_PTR(-ENOMEM);
812 export->exp_conn_cnt = 0;
813 export->exp_lock_hash = NULL;
814 cfs_atomic_set(&export->exp_refcount, 2);
815 cfs_atomic_set(&export->exp_rpc_count, 0);
816 cfs_atomic_set(&export->exp_cb_count, 0);
817 cfs_atomic_set(&export->exp_locks_count, 0);
818 #if LUSTRE_TRACKS_LOCK_EXP_REFS
819 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
820 cfs_spin_lock_init(&export->exp_locks_list_guard);
822 cfs_atomic_set(&export->exp_replay_count, 0);
823 export->exp_obd = obd;
824 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
825 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
826 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
827 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
828 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
829 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
830 class_handle_hash(&export->exp_handle, export_handle_addref);
831 export->exp_last_request_time = cfs_time_current_sec();
832 cfs_spin_lock_init(&export->exp_lock);
833 cfs_spin_lock_init(&export->exp_rpc_lock);
834 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
835 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
837 export->exp_sp_peer = LUSTRE_SP_ANY;
838 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
839 export->exp_client_uuid = *cluuid;
840 obd_init_export(export);
842 cfs_spin_lock(&obd->obd_dev_lock);
843 /* shouldn't happen, but might race */
844 if (obd->obd_stopping)
845 GOTO(exit_unlock, rc = -ENODEV);
847 hash = cfs_hash_getref(obd->obd_uuid_hash);
849 GOTO(exit_unlock, rc = -ENODEV);
850 cfs_spin_unlock(&obd->obd_dev_lock);
852 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
853 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
855 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
856 obd->obd_name, cluuid->uuid, rc);
857 GOTO(exit_err, rc = -EALREADY);
861 cfs_spin_lock(&obd->obd_dev_lock);
862 if (obd->obd_stopping) {
863 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
864 GOTO(exit_unlock, rc = -ENODEV);
867 class_incref(obd, "export", export);
868 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
869 cfs_list_add_tail(&export->exp_obd_chain_timed,
870 &export->exp_obd->obd_exports_timed);
871 export->exp_obd->obd_num_exports++;
872 cfs_spin_unlock(&obd->obd_dev_lock);
873 cfs_hash_putref(hash);
877 cfs_spin_unlock(&obd->obd_dev_lock);
880 cfs_hash_putref(hash);
881 class_handle_unhash(&export->exp_handle);
882 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
883 obd_destroy_export(export);
884 OBD_FREE_PTR(export);
887 EXPORT_SYMBOL(class_new_export);
889 void class_unlink_export(struct obd_export *exp)
891 class_handle_unhash(&exp->exp_handle);
893 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
894 /* delete an uuid-export hashitem from hashtables */
895 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
896 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
897 &exp->exp_client_uuid,
898 &exp->exp_uuid_hash);
900 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
901 cfs_list_del_init(&exp->exp_obd_chain_timed);
902 exp->exp_obd->obd_num_exports--;
903 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
904 class_export_put(exp);
906 EXPORT_SYMBOL(class_unlink_export);
908 /* Import management functions */
909 void class_import_destroy(struct obd_import *imp)
913 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
914 imp->imp_obd->obd_name);
916 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
918 ptlrpc_put_connection_superhack(imp->imp_connection);
920 while (!cfs_list_empty(&imp->imp_conn_list)) {
921 struct obd_import_conn *imp_conn;
923 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
924 struct obd_import_conn, oic_item);
925 cfs_list_del_init(&imp_conn->oic_item);
926 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
927 OBD_FREE(imp_conn, sizeof(*imp_conn));
930 LASSERT(imp->imp_sec == NULL);
931 class_decref(imp->imp_obd, "import", imp);
932 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
936 static void import_handle_addref(void *import)
938 class_import_get(import);
941 struct obd_import *class_import_get(struct obd_import *import)
943 cfs_atomic_inc(&import->imp_refcount);
944 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
945 cfs_atomic_read(&import->imp_refcount),
946 import->imp_obd->obd_name);
949 EXPORT_SYMBOL(class_import_get);
951 void class_import_put(struct obd_import *imp)
955 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
956 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, 0x5a5a5a);
958 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
959 cfs_atomic_read(&imp->imp_refcount) - 1,
960 imp->imp_obd->obd_name);
962 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
963 CDEBUG(D_INFO, "final put import %p\n", imp);
964 obd_zombie_import_add(imp);
969 EXPORT_SYMBOL(class_import_put);
971 static void init_imp_at(struct imp_at *at) {
973 at_init(&at->iat_net_latency, 0, 0);
974 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
975 /* max service estimates are tracked on the server side, so
976 don't use the AT history here, just use the last reported
977 val. (But keep hist for proc histogram, worst_ever) */
978 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
983 struct obd_import *class_new_import(struct obd_device *obd)
985 struct obd_import *imp;
987 OBD_ALLOC(imp, sizeof(*imp));
991 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
992 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
993 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
994 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
995 cfs_spin_lock_init(&imp->imp_lock);
996 imp->imp_last_success_conn = 0;
997 imp->imp_state = LUSTRE_IMP_NEW;
998 imp->imp_obd = class_incref(obd, "import", imp);
999 cfs_sema_init(&imp->imp_sec_mutex, 1);
1000 cfs_waitq_init(&imp->imp_recovery_waitq);
1002 cfs_atomic_set(&imp->imp_refcount, 2);
1003 cfs_atomic_set(&imp->imp_unregistering, 0);
1004 cfs_atomic_set(&imp->imp_inflight, 0);
1005 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1006 cfs_atomic_set(&imp->imp_inval_count, 0);
1007 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1008 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1009 class_handle_hash(&imp->imp_handle, import_handle_addref);
1010 init_imp_at(&imp->imp_at);
1012 /* the default magic is V2, will be used in connect RPC, and
1013 * then adjusted according to the flags in request/reply. */
1014 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1018 EXPORT_SYMBOL(class_new_import);
1020 void class_destroy_import(struct obd_import *import)
1022 LASSERT(import != NULL);
1023 LASSERT(import != LP_POISON);
1025 class_handle_unhash(&import->imp_handle);
1027 cfs_spin_lock(&import->imp_lock);
1028 import->imp_generation++;
1029 cfs_spin_unlock(&import->imp_lock);
1030 class_import_put(import);
1032 EXPORT_SYMBOL(class_destroy_import);
1034 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1036 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1038 cfs_spin_lock(&exp->exp_locks_list_guard);
1040 LASSERT(lock->l_exp_refs_nr >= 0);
1042 if (lock->l_exp_refs_target != NULL &&
1043 lock->l_exp_refs_target != exp) {
1044 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1045 exp, lock, lock->l_exp_refs_target);
1047 if ((lock->l_exp_refs_nr ++) == 0) {
1048 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1049 lock->l_exp_refs_target = exp;
1051 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1052 lock, exp, lock->l_exp_refs_nr);
1053 cfs_spin_unlock(&exp->exp_locks_list_guard);
1055 EXPORT_SYMBOL(__class_export_add_lock_ref);
1057 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1059 cfs_spin_lock(&exp->exp_locks_list_guard);
1060 LASSERT(lock->l_exp_refs_nr > 0);
1061 if (lock->l_exp_refs_target != exp) {
1062 LCONSOLE_WARN("lock %p, "
1063 "mismatching export pointers: %p, %p\n",
1064 lock, lock->l_exp_refs_target, exp);
1066 if (-- lock->l_exp_refs_nr == 0) {
1067 cfs_list_del_init(&lock->l_exp_refs_link);
1068 lock->l_exp_refs_target = NULL;
1070 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1071 lock, exp, lock->l_exp_refs_nr);
1072 cfs_spin_unlock(&exp->exp_locks_list_guard);
1074 EXPORT_SYMBOL(__class_export_del_lock_ref);
1077 /* A connection defines an export context in which preallocation can
1078 be managed. This releases the export pointer reference, and returns
1079 the export handle, so the export refcount is 1 when this function
1081 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1082 struct obd_uuid *cluuid)
1084 struct obd_export *export;
1085 LASSERT(conn != NULL);
1086 LASSERT(obd != NULL);
1087 LASSERT(cluuid != NULL);
1090 export = class_new_export(obd, cluuid);
1092 RETURN(PTR_ERR(export));
1094 conn->cookie = export->exp_handle.h_cookie;
1095 class_export_put(export);
1097 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1098 cluuid->uuid, conn->cookie);
1101 EXPORT_SYMBOL(class_connect);
1103 /* if export is involved in recovery then clean up related things */
1104 void class_export_recovery_cleanup(struct obd_export *exp)
1106 struct obd_device *obd = exp->exp_obd;
1108 cfs_spin_lock(&obd->obd_recovery_task_lock);
1109 if (exp->exp_delayed)
1110 obd->obd_delayed_clients--;
1111 if (obd->obd_recovering && exp->exp_in_recovery) {
1112 cfs_spin_lock(&exp->exp_lock);
1113 exp->exp_in_recovery = 0;
1114 cfs_spin_unlock(&exp->exp_lock);
1115 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1116 cfs_atomic_dec(&obd->obd_connected_clients);
1118 cfs_spin_unlock(&obd->obd_recovery_task_lock);
1119 /** Cleanup req replay fields */
1120 if (exp->exp_req_replay_needed) {
1121 cfs_spin_lock(&exp->exp_lock);
1122 exp->exp_req_replay_needed = 0;
1123 cfs_spin_unlock(&exp->exp_lock);
1124 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1125 cfs_atomic_dec(&obd->obd_req_replay_clients);
1127 /** Cleanup lock replay data */
1128 if (exp->exp_lock_replay_needed) {
1129 cfs_spin_lock(&exp->exp_lock);
1130 exp->exp_lock_replay_needed = 0;
1131 cfs_spin_unlock(&exp->exp_lock);
1132 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1133 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1137 /* This function removes 1-3 references from the export:
1138 * 1 - for export pointer passed
1139 * and if disconnect really need
1140 * 2 - removing from hash
1141 * 3 - in client_unlink_export
1142 * The export pointer passed to this function can destroyed */
1143 int class_disconnect(struct obd_export *export)
1145 int already_disconnected;
1148 if (export == NULL) {
1149 CWARN("attempting to free NULL export %p\n", export);
1153 cfs_spin_lock(&export->exp_lock);
1154 already_disconnected = export->exp_disconnected;
1155 export->exp_disconnected = 1;
1156 cfs_spin_unlock(&export->exp_lock);
1158 /* class_cleanup(), abort_recovery(), and class_fail_export()
1159 * all end up in here, and if any of them race we shouldn't
1160 * call extra class_export_puts(). */
1161 if (already_disconnected) {
1162 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1163 GOTO(no_disconn, already_disconnected);
1166 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1167 export->exp_handle.h_cookie);
1169 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1170 cfs_hash_del(export->exp_obd->obd_nid_hash,
1171 &export->exp_connection->c_peer.nid,
1172 &export->exp_nid_hash);
1174 class_export_recovery_cleanup(export);
1175 class_unlink_export(export);
1177 class_export_put(export);
1180 EXPORT_SYMBOL(class_disconnect);
1182 /* Return non-zero for a fully connected export */
1183 int class_connected_export(struct obd_export *exp)
1187 cfs_spin_lock(&exp->exp_lock);
1188 connected = (exp->exp_conn_cnt > 0);
1189 cfs_spin_unlock(&exp->exp_lock);
1194 EXPORT_SYMBOL(class_connected_export);
1196 static void class_disconnect_export_list(cfs_list_t *list,
1197 enum obd_option flags)
1200 struct obd_export *exp;
1203 /* It's possible that an export may disconnect itself, but
1204 * nothing else will be added to this list. */
1205 while (!cfs_list_empty(list)) {
1206 exp = cfs_list_entry(list->next, struct obd_export,
1208 /* need for safe call CDEBUG after obd_disconnect */
1209 class_export_get(exp);
1211 cfs_spin_lock(&exp->exp_lock);
1212 exp->exp_flags = flags;
1213 cfs_spin_unlock(&exp->exp_lock);
1215 if (obd_uuid_equals(&exp->exp_client_uuid,
1216 &exp->exp_obd->obd_uuid)) {
1218 "exp %p export uuid == obd uuid, don't discon\n",
1220 /* Need to delete this now so we don't end up pointing
1221 * to work_list later when this export is cleaned up. */
1222 cfs_list_del_init(&exp->exp_obd_chain);
1223 class_export_put(exp);
1227 class_export_get(exp);
1228 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1229 "last request at "CFS_TIME_T"\n",
1230 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1231 exp, exp->exp_last_request_time);
1232 /* release one export reference anyway */
1233 rc = obd_disconnect(exp);
1235 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1236 obd_export_nid2str(exp), exp, rc);
1237 class_export_put(exp);
1242 void class_disconnect_exports(struct obd_device *obd)
1244 cfs_list_t work_list;
1247 /* Move all of the exports from obd_exports to a work list, en masse. */
1248 CFS_INIT_LIST_HEAD(&work_list);
1249 cfs_spin_lock(&obd->obd_dev_lock);
1250 cfs_list_splice_init(&obd->obd_exports, &work_list);
1251 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1252 cfs_spin_unlock(&obd->obd_dev_lock);
1254 if (!cfs_list_empty(&work_list)) {
1255 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1256 "disconnecting them\n", obd->obd_minor, obd);
1257 class_disconnect_export_list(&work_list,
1258 exp_flags_from_obd(obd));
1260 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1261 obd->obd_minor, obd);
1264 EXPORT_SYMBOL(class_disconnect_exports);
1266 /* Remove exports that have not completed recovery.
1268 void class_disconnect_stale_exports(struct obd_device *obd,
1269 int (*test_export)(struct obd_export *))
1271 cfs_list_t work_list;
1272 cfs_list_t *pos, *n;
1273 struct obd_export *exp;
1277 CFS_INIT_LIST_HEAD(&work_list);
1278 cfs_spin_lock(&obd->obd_dev_lock);
1279 cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1280 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1281 if (test_export(exp))
1284 /* don't count self-export as client */
1285 if (obd_uuid_equals(&exp->exp_client_uuid,
1286 &exp->exp_obd->obd_uuid))
1289 cfs_list_move(&exp->exp_obd_chain, &work_list);
1291 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1292 obd->obd_name, exp->exp_client_uuid.uuid,
1293 exp->exp_connection == NULL ? "<unknown>" :
1294 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1295 print_export_data(exp, "EVICTING", 0);
1297 cfs_spin_unlock(&obd->obd_dev_lock);
1300 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1301 obd->obd_name, evicted);
1302 obd->obd_stale_clients += evicted;
1304 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1305 OBD_OPT_ABORT_RECOV);
1308 EXPORT_SYMBOL(class_disconnect_stale_exports);
1310 void class_fail_export(struct obd_export *exp)
1312 int rc, already_failed;
1314 cfs_spin_lock(&exp->exp_lock);
1315 already_failed = exp->exp_failed;
1316 exp->exp_failed = 1;
1317 cfs_spin_unlock(&exp->exp_lock);
1319 if (already_failed) {
1320 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1321 exp, exp->exp_client_uuid.uuid);
1325 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1326 exp, exp->exp_client_uuid.uuid);
1328 if (obd_dump_on_timeout)
1329 libcfs_debug_dumplog();
1331 /* Most callers into obd_disconnect are removing their own reference
1332 * (request, for example) in addition to the one from the hash table.
1333 * We don't have such a reference here, so make one. */
1334 class_export_get(exp);
1335 rc = obd_disconnect(exp);
1337 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1339 CDEBUG(D_HA, "disconnected export %p/%s\n",
1340 exp, exp->exp_client_uuid.uuid);
1342 EXPORT_SYMBOL(class_fail_export);
1344 char *obd_export_nid2str(struct obd_export *exp)
1346 if (exp->exp_connection != NULL)
1347 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1351 EXPORT_SYMBOL(obd_export_nid2str);
1353 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1355 struct obd_export *doomed_exp = NULL;
1356 int exports_evicted = 0;
1358 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1361 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1362 if (doomed_exp == NULL)
1365 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1366 "nid %s found, wanted nid %s, requested nid %s\n",
1367 obd_export_nid2str(doomed_exp),
1368 libcfs_nid2str(nid_key), nid);
1369 LASSERTF(doomed_exp != obd->obd_self_export,
1370 "self-export is hashed by NID?\n");
1372 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1373 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1375 class_fail_export(doomed_exp);
1376 class_export_put(doomed_exp);
1379 if (!exports_evicted)
1380 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1381 obd->obd_name, nid);
1382 return exports_evicted;
1384 EXPORT_SYMBOL(obd_export_evict_by_nid);
1386 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1388 struct obd_export *doomed_exp = NULL;
1389 struct obd_uuid doomed_uuid;
1390 int exports_evicted = 0;
1392 obd_str2uuid(&doomed_uuid, uuid);
1393 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1394 CERROR("%s: can't evict myself\n", obd->obd_name);
1395 return exports_evicted;
1398 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1400 if (doomed_exp == NULL) {
1401 CERROR("%s: can't disconnect %s: no exports found\n",
1402 obd->obd_name, uuid);
1404 CWARN("%s: evicting %s at adminstrative request\n",
1405 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1406 class_fail_export(doomed_exp);
1407 class_export_put(doomed_exp);
1411 return exports_evicted;
1413 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1415 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1416 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1417 EXPORT_SYMBOL(class_export_dump_hook);
1420 static void print_export_data(struct obd_export *exp, const char *status,
1423 struct ptlrpc_reply_state *rs;
1424 struct ptlrpc_reply_state *first_reply = NULL;
1427 cfs_spin_lock(&exp->exp_lock);
1428 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1434 cfs_spin_unlock(&exp->exp_lock);
1436 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1437 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1438 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1439 cfs_atomic_read(&exp->exp_rpc_count),
1440 cfs_atomic_read(&exp->exp_cb_count),
1441 cfs_atomic_read(&exp->exp_locks_count),
1442 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1443 nreplies, first_reply, nreplies > 3 ? "..." : "",
1444 exp->exp_last_committed);
1445 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1446 if (locks && class_export_dump_hook != NULL)
1447 class_export_dump_hook(exp);
1451 void dump_exports(struct obd_device *obd, int locks)
1453 struct obd_export *exp;
1455 cfs_spin_lock(&obd->obd_dev_lock);
1456 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1457 print_export_data(exp, "ACTIVE", locks);
1458 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1459 print_export_data(exp, "UNLINKED", locks);
1460 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1461 print_export_data(exp, "DELAYED", locks);
1462 cfs_spin_unlock(&obd->obd_dev_lock);
1463 cfs_spin_lock(&obd_zombie_impexp_lock);
1464 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1465 print_export_data(exp, "ZOMBIE", locks);
1466 cfs_spin_unlock(&obd_zombie_impexp_lock);
1468 EXPORT_SYMBOL(dump_exports);
1470 void obd_exports_barrier(struct obd_device *obd)
1473 LASSERT(cfs_list_empty(&obd->obd_exports));
1474 cfs_spin_lock(&obd->obd_dev_lock);
1475 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1476 cfs_spin_unlock(&obd->obd_dev_lock);
1477 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1478 cfs_time_seconds(waited));
1479 if (waited > 5 && IS_PO2(waited)) {
1480 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1481 "more than %d seconds. "
1482 "The obd refcount = %d. Is it stuck?\n",
1483 obd->obd_name, waited,
1484 cfs_atomic_read(&obd->obd_refcount));
1485 dump_exports(obd, 1);
1488 cfs_spin_lock(&obd->obd_dev_lock);
1490 cfs_spin_unlock(&obd->obd_dev_lock);
1492 EXPORT_SYMBOL(obd_exports_barrier);
1494 /* Total amount of zombies to be destroyed */
1495 static int zombies_count = 0;
1498 * kill zombie imports and exports
1500 void obd_zombie_impexp_cull(void)
1502 struct obd_import *import;
1503 struct obd_export *export;
1507 cfs_spin_lock(&obd_zombie_impexp_lock);
1510 if (!cfs_list_empty(&obd_zombie_imports)) {
1511 import = cfs_list_entry(obd_zombie_imports.next,
1514 cfs_list_del_init(&import->imp_zombie_chain);
1518 if (!cfs_list_empty(&obd_zombie_exports)) {
1519 export = cfs_list_entry(obd_zombie_exports.next,
1522 cfs_list_del_init(&export->exp_obd_chain);
1525 cfs_spin_unlock(&obd_zombie_impexp_lock);
1527 if (import != NULL) {
1528 class_import_destroy(import);
1529 cfs_spin_lock(&obd_zombie_impexp_lock);
1531 cfs_spin_unlock(&obd_zombie_impexp_lock);
1534 if (export != NULL) {
1535 class_export_destroy(export);
1536 cfs_spin_lock(&obd_zombie_impexp_lock);
1538 cfs_spin_unlock(&obd_zombie_impexp_lock);
1542 } while (import != NULL || export != NULL);
1546 static cfs_completion_t obd_zombie_start;
1547 static cfs_completion_t obd_zombie_stop;
1548 static unsigned long obd_zombie_flags;
1549 static cfs_waitq_t obd_zombie_waitq;
1550 static pid_t obd_zombie_pid;
1553 OBD_ZOMBIE_STOP = 1 << 1
1557 * check for work for kill zombie import/export thread.
1559 static int obd_zombie_impexp_check(void *arg)
1563 cfs_spin_lock(&obd_zombie_impexp_lock);
1564 rc = (zombies_count == 0) &&
1565 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1566 cfs_spin_unlock(&obd_zombie_impexp_lock);
1572 * Add export to the obd_zombe thread and notify it.
1574 static void obd_zombie_export_add(struct obd_export *exp) {
1575 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1576 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1577 cfs_list_del_init(&exp->exp_obd_chain);
1578 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1579 cfs_spin_lock(&obd_zombie_impexp_lock);
1581 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1582 cfs_spin_unlock(&obd_zombie_impexp_lock);
1584 obd_zombie_impexp_notify();
1588 * Add import to the obd_zombe thread and notify it.
1590 static void obd_zombie_import_add(struct obd_import *imp) {
1591 LASSERT(imp->imp_sec == NULL);
1592 cfs_spin_lock(&obd_zombie_impexp_lock);
1593 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1595 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1596 cfs_spin_unlock(&obd_zombie_impexp_lock);
1598 obd_zombie_impexp_notify();
1602 * notify import/export destroy thread about new zombie.
1604 static void obd_zombie_impexp_notify(void)
1607 * Make sure obd_zomebie_impexp_thread get this notification.
1608 * It is possible this signal only get by obd_zombie_barrier, and
1609 * barrier gulps this notification and sleeps away and hangs ensues
1611 cfs_waitq_broadcast(&obd_zombie_waitq);
1615 * check whether obd_zombie is idle
1617 static int obd_zombie_is_idle(void)
1621 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1622 cfs_spin_lock(&obd_zombie_impexp_lock);
1623 rc = (zombies_count == 0);
1624 cfs_spin_unlock(&obd_zombie_impexp_lock);
1629 * wait when obd_zombie import/export queues become empty
1631 void obd_zombie_barrier(void)
1633 struct l_wait_info lwi = { 0 };
1635 if (obd_zombie_pid == cfs_curproc_pid())
1636 /* don't wait for myself */
1638 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1640 EXPORT_SYMBOL(obd_zombie_barrier);
1645 * destroy zombie export/import thread.
1647 static int obd_zombie_impexp_thread(void *unused)
1651 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1652 cfs_complete(&obd_zombie_start);
1656 cfs_complete(&obd_zombie_start);
1658 obd_zombie_pid = cfs_curproc_pid();
1660 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1661 struct l_wait_info lwi = { 0 };
1663 l_wait_event(obd_zombie_waitq,
1664 !obd_zombie_impexp_check(NULL), &lwi);
1665 obd_zombie_impexp_cull();
1668 * Notify obd_zombie_barrier callers that queues
1671 cfs_waitq_signal(&obd_zombie_waitq);
1674 cfs_complete(&obd_zombie_stop);
1679 #else /* ! KERNEL */
1681 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1682 static void *obd_zombie_impexp_work_cb;
1683 static void *obd_zombie_impexp_idle_cb;
1685 int obd_zombie_impexp_kill(void *arg)
1689 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1690 obd_zombie_impexp_cull();
1693 cfs_atomic_dec(&zombie_recur);
1700 * start destroy zombie import/export thread
1702 int obd_zombie_impexp_init(void)
1706 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1707 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1708 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1709 cfs_init_completion(&obd_zombie_start);
1710 cfs_init_completion(&obd_zombie_stop);
1711 cfs_waitq_init(&obd_zombie_waitq);
1715 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1719 cfs_wait_for_completion(&obd_zombie_start);
1722 obd_zombie_impexp_work_cb =
1723 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1724 &obd_zombie_impexp_kill, NULL);
1726 obd_zombie_impexp_idle_cb =
1727 liblustre_register_idle_callback("obd_zombi_impexp_check",
1728 &obd_zombie_impexp_check, NULL);
1734 * stop destroy zombie import/export thread
1736 void obd_zombie_impexp_stop(void)
1738 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1739 obd_zombie_impexp_notify();
1741 cfs_wait_for_completion(&obd_zombie_stop);
1743 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1744 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1748 /***** Kernel-userspace comm helpers *******/
1750 /* Get length of entire message, including header */
1751 int kuc_len(int payload_len)
1753 return sizeof(struct kuc_hdr) + payload_len;
1755 EXPORT_SYMBOL(kuc_len);
1757 /* Get a pointer to kuc header, given a ptr to the payload
1758 * @param p Pointer to payload area
1759 * @returns Pointer to kuc header
1761 struct kuc_hdr * kuc_ptr(void *p)
1763 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1764 LASSERT(lh->kuc_magic == KUC_MAGIC);
1767 EXPORT_SYMBOL(kuc_ptr);
1769 /* Test if payload is part of kuc message
1770 * @param p Pointer to payload area
1773 int kuc_ispayload(void *p)
1775 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1777 if (kh->kuc_magic == KUC_MAGIC)
1782 EXPORT_SYMBOL(kuc_ispayload);
1784 /* Alloc space for a message, and fill in header
1785 * @return Pointer to payload area
1787 void *kuc_alloc(int payload_len, int transport, int type)
1790 int len = kuc_len(payload_len);
1794 return ERR_PTR(-ENOMEM);
1796 lh->kuc_magic = KUC_MAGIC;
1797 lh->kuc_transport = transport;
1798 lh->kuc_msgtype = type;
1799 lh->kuc_msglen = len;
1801 return (void *)(lh + 1);
1803 EXPORT_SYMBOL(kuc_alloc);
1805 /* Takes pointer to payload area */
1806 inline void kuc_free(void *p, int payload_len)
1808 struct kuc_hdr *lh = kuc_ptr(p);
1809 OBD_FREE(lh, kuc_len(payload_len));
1811 EXPORT_SYMBOL(kuc_free);