1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 cfs_spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
70 * support functions: we could use inter-module communication, but this
71 * is more portable to other OS's
73 static struct obd_device *obd_device_alloc(void)
75 struct obd_device *obd;
77 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79 obd->obd_magic = OBD_DEVICE_MAGIC;
83 EXPORT_SYMBOL(obd_device_alloc);
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 cfs_spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 cfs_spin_unlock(&obd_types_lock);
112 cfs_spin_unlock(&obd_types_lock);
116 struct obd_type *class_get_type(const char *name)
118 struct obd_type *type = class_search_type(name);
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
122 const char *modname = name;
123 if (!cfs_request_module("%s", modname)) {
124 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
125 type = class_search_type(name);
127 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
133 cfs_spin_lock(&type->obd_type_lock);
135 cfs_try_module_get(type->typ_dt_ops->o_owner);
136 cfs_spin_unlock(&type->obd_type_lock);
141 void class_put_type(struct obd_type *type)
144 cfs_spin_lock(&type->obd_type_lock);
146 cfs_module_put(type->typ_dt_ops->o_owner);
147 cfs_spin_unlock(&type->obd_type_lock);
150 #define CLASS_MAX_NAME 1024
152 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
153 struct lprocfs_vars *vars, const char *name,
154 struct lu_device_type *ldt)
156 struct obd_type *type;
161 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
163 if (class_search_type(name)) {
164 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
169 OBD_ALLOC(type, sizeof(*type));
173 OBD_ALLOC_PTR(type->typ_dt_ops);
174 OBD_ALLOC_PTR(type->typ_md_ops);
175 OBD_ALLOC(type->typ_name, strlen(name) + 1);
177 if (type->typ_dt_ops == NULL ||
178 type->typ_md_ops == NULL ||
179 type->typ_name == NULL)
182 *(type->typ_dt_ops) = *dt_ops;
183 /* md_ops is optional */
185 *(type->typ_md_ops) = *md_ops;
186 strcpy(type->typ_name, name);
187 cfs_spin_lock_init(&type->obd_type_lock);
190 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
192 if (IS_ERR(type->typ_procroot)) {
193 rc = PTR_ERR(type->typ_procroot);
194 type->typ_procroot = NULL;
200 rc = lu_device_type_init(ldt);
205 cfs_spin_lock(&obd_types_lock);
206 cfs_list_add(&type->typ_chain, &obd_types);
207 cfs_spin_unlock(&obd_types_lock);
212 if (type->typ_name != NULL)
213 OBD_FREE(type->typ_name, strlen(name) + 1);
214 if (type->typ_md_ops != NULL)
215 OBD_FREE_PTR(type->typ_md_ops);
216 if (type->typ_dt_ops != NULL)
217 OBD_FREE_PTR(type->typ_dt_ops);
218 OBD_FREE(type, sizeof(*type));
222 int class_unregister_type(const char *name)
224 struct obd_type *type = class_search_type(name);
228 CERROR("unknown obd type\n");
232 if (type->typ_refcnt) {
233 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
234 /* This is a bad situation, let's make the best of it */
235 /* Remove ops, but leave the name for debugging */
236 OBD_FREE_PTR(type->typ_dt_ops);
237 OBD_FREE_PTR(type->typ_md_ops);
241 if (type->typ_procroot) {
242 lprocfs_remove(&type->typ_procroot);
246 lu_device_type_fini(type->typ_lu);
248 cfs_spin_lock(&obd_types_lock);
249 cfs_list_del(&type->typ_chain);
250 cfs_spin_unlock(&obd_types_lock);
251 OBD_FREE(type->typ_name, strlen(name) + 1);
252 if (type->typ_dt_ops != NULL)
253 OBD_FREE_PTR(type->typ_dt_ops);
254 if (type->typ_md_ops != NULL)
255 OBD_FREE_PTR(type->typ_md_ops);
256 OBD_FREE(type, sizeof(*type));
258 } /* class_unregister_type */
261 * Create a new obd device.
263 * Find an empty slot in ::obd_devs[], create a new obd device in it.
265 * \param[in] type_name obd device type string.
266 * \param[in] name obd device name.
268 * \retval NULL if create fails, otherwise return the obd device
271 struct obd_device *class_newdev(const char *type_name, const char *name)
273 struct obd_device *result = NULL;
274 struct obd_device *newdev;
275 struct obd_type *type = NULL;
277 int new_obd_minor = 0;
279 if (strlen(name) >= MAX_OBD_NAME) {
280 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
281 RETURN(ERR_PTR(-EINVAL));
284 type = class_get_type(type_name);
286 CERROR("OBD: unknown type: %s\n", type_name);
287 RETURN(ERR_PTR(-ENODEV));
290 newdev = obd_device_alloc();
291 if (newdev == NULL) {
292 class_put_type(type);
293 RETURN(ERR_PTR(-ENOMEM));
295 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
297 cfs_spin_lock(&obd_dev_lock);
298 for (i = 0; i < class_devno_max(); i++) {
299 struct obd_device *obd = class_num2obd(i);
300 if (obd && obd->obd_name &&
301 (strcmp(name, obd->obd_name) == 0)) {
302 CERROR("Device %s already exists, won't add\n", name);
304 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
305 "%p obd_magic %08x != %08x\n", result,
306 result->obd_magic, OBD_DEVICE_MAGIC);
307 LASSERTF(result->obd_minor == new_obd_minor,
308 "%p obd_minor %d != %d\n", result,
309 result->obd_minor, new_obd_minor);
311 obd_devs[result->obd_minor] = NULL;
312 result->obd_name[0]='\0';
314 result = ERR_PTR(-EEXIST);
317 if (!result && !obd) {
319 result->obd_minor = i;
321 result->obd_type = type;
322 strncpy(result->obd_name, name,
323 sizeof(result->obd_name) - 1);
324 obd_devs[i] = result;
327 cfs_spin_unlock(&obd_dev_lock);
329 if (result == NULL && i >= class_devno_max()) {
330 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
332 result = ERR_PTR(-EOVERFLOW);
335 if (IS_ERR(result)) {
336 obd_device_free(newdev);
337 class_put_type(type);
339 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
340 result->obd_name, result);
345 void class_release_dev(struct obd_device *obd)
347 struct obd_type *obd_type = obd->obd_type;
349 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
350 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
351 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
352 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
353 LASSERT(obd_type != NULL);
355 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
356 obd->obd_name,obd->obd_type->typ_name);
358 cfs_spin_lock(&obd_dev_lock);
359 obd_devs[obd->obd_minor] = NULL;
360 cfs_spin_unlock(&obd_dev_lock);
361 obd_device_free(obd);
363 class_put_type(obd_type);
366 int class_name2dev(const char *name)
373 cfs_spin_lock(&obd_dev_lock);
374 for (i = 0; i < class_devno_max(); i++) {
375 struct obd_device *obd = class_num2obd(i);
376 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
377 /* Make sure we finished attaching before we give
378 out any references */
379 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
380 if (obd->obd_attached) {
381 cfs_spin_unlock(&obd_dev_lock);
387 cfs_spin_unlock(&obd_dev_lock);
392 struct obd_device *class_name2obd(const char *name)
394 int dev = class_name2dev(name);
396 if (dev < 0 || dev > class_devno_max())
398 return class_num2obd(dev);
401 int class_uuid2dev(struct obd_uuid *uuid)
405 cfs_spin_lock(&obd_dev_lock);
406 for (i = 0; i < class_devno_max(); i++) {
407 struct obd_device *obd = class_num2obd(i);
408 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
409 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
410 cfs_spin_unlock(&obd_dev_lock);
414 cfs_spin_unlock(&obd_dev_lock);
419 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
421 int dev = class_uuid2dev(uuid);
424 return class_num2obd(dev);
428 * Get obd device from ::obd_devs[]
430 * \param num [in] array index
432 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
433 * otherwise return the obd device there.
435 struct obd_device *class_num2obd(int num)
437 struct obd_device *obd = NULL;
439 if (num < class_devno_max()) {
444 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
445 "%p obd_magic %08x != %08x\n",
446 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
447 LASSERTF(obd->obd_minor == num,
448 "%p obd_minor %0d != %0d\n",
449 obd, obd->obd_minor, num);
455 void class_obd_list(void)
460 cfs_spin_lock(&obd_dev_lock);
461 for (i = 0; i < class_devno_max(); i++) {
462 struct obd_device *obd = class_num2obd(i);
465 if (obd->obd_stopping)
467 else if (obd->obd_set_up)
469 else if (obd->obd_attached)
473 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
474 i, status, obd->obd_type->typ_name,
475 obd->obd_name, obd->obd_uuid.uuid,
476 cfs_atomic_read(&obd->obd_refcount));
478 cfs_spin_unlock(&obd_dev_lock);
482 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
483 specified, then only the client with that uuid is returned,
484 otherwise any client connected to the tgt is returned. */
485 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
486 const char * typ_name,
487 struct obd_uuid *grp_uuid)
491 cfs_spin_lock(&obd_dev_lock);
492 for (i = 0; i < class_devno_max(); i++) {
493 struct obd_device *obd = class_num2obd(i);
496 if ((strncmp(obd->obd_type->typ_name, typ_name,
497 strlen(typ_name)) == 0)) {
498 if (obd_uuid_equals(tgt_uuid,
499 &obd->u.cli.cl_target_uuid) &&
500 ((grp_uuid)? obd_uuid_equals(grp_uuid,
501 &obd->obd_uuid) : 1)) {
502 cfs_spin_unlock(&obd_dev_lock);
507 cfs_spin_unlock(&obd_dev_lock);
512 /* Iterate the obd_device list looking devices have grp_uuid. Start
513 searching at *next, and if a device is found, the next index to look
514 at is saved in *next. If next is NULL, then the first matching device
515 will always be returned. */
516 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
522 else if (*next >= 0 && *next < class_devno_max())
527 cfs_spin_lock(&obd_dev_lock);
528 for (; i < class_devno_max(); i++) {
529 struct obd_device *obd = class_num2obd(i);
532 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
535 cfs_spin_unlock(&obd_dev_lock);
539 cfs_spin_unlock(&obd_dev_lock);
545 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
546 * adjust sptlrpc settings accordingly.
548 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
550 struct obd_device *obd;
554 LASSERT(namelen > 0);
556 cfs_spin_lock(&obd_dev_lock);
557 for (i = 0; i < class_devno_max(); i++) {
558 obd = class_num2obd(i);
560 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
563 /* only notify mdc, osc, mdt, ost */
564 type = obd->obd_type->typ_name;
565 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
566 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
567 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
568 strcmp(type, LUSTRE_OST_NAME) != 0)
571 if (strncmp(obd->obd_name, fsname, namelen))
574 class_incref(obd, __FUNCTION__, obd);
575 cfs_spin_unlock(&obd_dev_lock);
576 rc2 = obd_set_info_async(obd->obd_self_export,
577 sizeof(KEY_SPTLRPC_CONF),
578 KEY_SPTLRPC_CONF, 0, NULL, NULL);
580 class_decref(obd, __FUNCTION__, obd);
581 cfs_spin_lock(&obd_dev_lock);
583 cfs_spin_unlock(&obd_dev_lock);
586 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
588 void obd_cleanup_caches(void)
593 if (obd_device_cachep) {
594 rc = cfs_mem_cache_destroy(obd_device_cachep);
595 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
596 obd_device_cachep = NULL;
599 rc = cfs_mem_cache_destroy(obdo_cachep);
600 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
604 rc = cfs_mem_cache_destroy(import_cachep);
605 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
606 import_cachep = NULL;
609 rc = cfs_mem_cache_destroy(capa_cachep);
610 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
616 int obd_init_caches(void)
620 LASSERT(obd_device_cachep == NULL);
621 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
622 sizeof(struct obd_device),
624 if (!obd_device_cachep)
627 LASSERT(obdo_cachep == NULL);
628 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
633 LASSERT(import_cachep == NULL);
634 import_cachep = cfs_mem_cache_create("ll_import_cache",
635 sizeof(struct obd_import),
640 LASSERT(capa_cachep == NULL);
641 capa_cachep = cfs_mem_cache_create("capa_cache",
642 sizeof(struct obd_capa), 0, 0);
648 obd_cleanup_caches();
653 /* map connection to client */
654 struct obd_export *class_conn2export(struct lustre_handle *conn)
656 struct obd_export *export;
660 CDEBUG(D_CACHE, "looking for null handle\n");
664 if (conn->cookie == -1) { /* this means assign a new connection */
665 CDEBUG(D_CACHE, "want a new connection\n");
669 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
670 export = class_handle2object(conn->cookie);
674 struct obd_device *class_exp2obd(struct obd_export *exp)
681 struct obd_device *class_conn2obd(struct lustre_handle *conn)
683 struct obd_export *export;
684 export = class_conn2export(conn);
686 struct obd_device *obd = export->exp_obd;
687 class_export_put(export);
693 struct obd_import *class_exp2cliimp(struct obd_export *exp)
695 struct obd_device *obd = exp->exp_obd;
698 return obd->u.cli.cl_import;
701 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
703 struct obd_device *obd = class_conn2obd(conn);
706 return obd->u.cli.cl_import;
709 /* Export management functions */
710 static void class_export_destroy(struct obd_export *exp)
712 struct obd_device *obd = exp->exp_obd;
715 LASSERT (cfs_atomic_read(&exp->exp_refcount) == 0);
717 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
718 exp->exp_client_uuid.uuid, obd->obd_name);
720 LASSERT(obd != NULL);
722 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
723 if (exp->exp_connection)
724 ptlrpc_put_connection_superhack(exp->exp_connection);
726 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
727 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
728 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
729 LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
730 obd_destroy_export(exp);
731 class_decref(obd, "export", exp);
733 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
737 static void export_handle_addref(void *export)
739 class_export_get(export);
742 struct obd_export *class_export_get(struct obd_export *exp)
744 cfs_atomic_inc(&exp->exp_refcount);
745 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
746 cfs_atomic_read(&exp->exp_refcount));
749 EXPORT_SYMBOL(class_export_get);
751 void class_export_put(struct obd_export *exp)
753 LASSERT(exp != NULL);
754 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
755 cfs_atomic_read(&exp->exp_refcount) - 1);
756 LASSERT(cfs_atomic_read(&exp->exp_refcount) > 0);
757 LASSERT(cfs_atomic_read(&exp->exp_refcount) < 0x5a5a5a);
759 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
760 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
761 CDEBUG(D_IOCTL, "final put %p/%s\n",
762 exp, exp->exp_client_uuid.uuid);
763 obd_zombie_export_add(exp);
766 EXPORT_SYMBOL(class_export_put);
768 /* Creates a new export, adds it to the hash table, and returns a
769 * pointer to it. The refcount is 2: one for the hash reference, and
770 * one for the pointer returned by this function. */
771 struct obd_export *class_new_export(struct obd_device *obd,
772 struct obd_uuid *cluuid)
774 struct obd_export *export;
775 cfs_hash_t *hash = NULL;
779 OBD_ALLOC_PTR(export);
781 return ERR_PTR(-ENOMEM);
783 export->exp_conn_cnt = 0;
784 export->exp_lock_hash = NULL;
785 cfs_atomic_set(&export->exp_refcount, 2);
786 cfs_atomic_set(&export->exp_rpc_count, 0);
787 cfs_atomic_set(&export->exp_cb_count, 0);
788 cfs_atomic_set(&export->exp_locks_count, 0);
789 #if LUSTRE_TRACKS_LOCK_EXP_REFS
790 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
791 cfs_spin_lock_init(&export->exp_locks_list_guard);
793 cfs_atomic_set(&export->exp_replay_count, 0);
794 export->exp_obd = obd;
795 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
796 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
797 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
798 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
799 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
800 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
801 class_handle_hash(&export->exp_handle, export_handle_addref);
802 export->exp_last_request_time = cfs_time_current_sec();
803 cfs_spin_lock_init(&export->exp_lock);
804 cfs_spin_lock_init(&export->exp_rpc_lock);
805 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
806 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
808 export->exp_sp_peer = LUSTRE_SP_ANY;
809 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
810 export->exp_client_uuid = *cluuid;
811 obd_init_export(export);
813 cfs_spin_lock(&obd->obd_dev_lock);
814 /* shouldn't happen, but might race */
815 if (obd->obd_stopping)
816 GOTO(exit_unlock, rc = -ENODEV);
818 hash = cfs_hash_getref(obd->obd_uuid_hash);
820 GOTO(exit_unlock, rc = -ENODEV);
821 cfs_spin_unlock(&obd->obd_dev_lock);
823 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
824 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
826 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
827 obd->obd_name, cluuid->uuid, rc);
828 GOTO(exit_err, rc = -EALREADY);
832 cfs_spin_lock(&obd->obd_dev_lock);
833 if (obd->obd_stopping) {
834 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
835 GOTO(exit_unlock, rc = -ENODEV);
838 class_incref(obd, "export", export);
839 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
840 cfs_list_add_tail(&export->exp_obd_chain_timed,
841 &export->exp_obd->obd_exports_timed);
842 export->exp_obd->obd_num_exports++;
843 cfs_spin_unlock(&obd->obd_dev_lock);
844 cfs_hash_putref(hash);
848 cfs_spin_unlock(&obd->obd_dev_lock);
851 cfs_hash_putref(hash);
852 class_handle_unhash(&export->exp_handle);
853 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
854 obd_destroy_export(export);
855 OBD_FREE_PTR(export);
858 EXPORT_SYMBOL(class_new_export);
860 void class_unlink_export(struct obd_export *exp)
862 class_handle_unhash(&exp->exp_handle);
864 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
865 /* delete an uuid-export hashitem from hashtables */
866 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
867 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
868 &exp->exp_client_uuid,
869 &exp->exp_uuid_hash);
871 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
872 cfs_list_del_init(&exp->exp_obd_chain_timed);
873 exp->exp_obd->obd_num_exports--;
874 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
875 class_export_put(exp);
877 EXPORT_SYMBOL(class_unlink_export);
879 /* Import management functions */
880 void class_import_destroy(struct obd_import *imp)
884 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
885 imp->imp_obd->obd_name);
887 LASSERT(cfs_atomic_read(&imp->imp_refcount) == 0);
889 ptlrpc_put_connection_superhack(imp->imp_connection);
891 while (!cfs_list_empty(&imp->imp_conn_list)) {
892 struct obd_import_conn *imp_conn;
894 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
895 struct obd_import_conn, oic_item);
896 cfs_list_del_init(&imp_conn->oic_item);
897 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
898 OBD_FREE(imp_conn, sizeof(*imp_conn));
901 LASSERT(imp->imp_sec == NULL);
902 class_decref(imp->imp_obd, "import", imp);
903 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
907 static void import_handle_addref(void *import)
909 class_import_get(import);
912 struct obd_import *class_import_get(struct obd_import *import)
914 LASSERT(cfs_atomic_read(&import->imp_refcount) >= 0);
915 LASSERT(cfs_atomic_read(&import->imp_refcount) < 0x5a5a5a);
916 cfs_atomic_inc(&import->imp_refcount);
917 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
918 cfs_atomic_read(&import->imp_refcount),
919 import->imp_obd->obd_name);
922 EXPORT_SYMBOL(class_import_get);
924 void class_import_put(struct obd_import *imp)
928 LASSERT(cfs_atomic_read(&imp->imp_refcount) > 0);
929 LASSERT(cfs_atomic_read(&imp->imp_refcount) < 0x5a5a5a);
930 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
932 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
933 cfs_atomic_read(&imp->imp_refcount) - 1,
934 imp->imp_obd->obd_name);
936 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
937 CDEBUG(D_INFO, "final put import %p\n", imp);
938 obd_zombie_import_add(imp);
943 EXPORT_SYMBOL(class_import_put);
945 static void init_imp_at(struct imp_at *at) {
947 at_init(&at->iat_net_latency, 0, 0);
948 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
949 /* max service estimates are tracked on the server side, so
950 don't use the AT history here, just use the last reported
951 val. (But keep hist for proc histogram, worst_ever) */
952 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
957 struct obd_import *class_new_import(struct obd_device *obd)
959 struct obd_import *imp;
961 OBD_ALLOC(imp, sizeof(*imp));
965 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
966 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
967 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
968 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
969 cfs_spin_lock_init(&imp->imp_lock);
970 imp->imp_last_success_conn = 0;
971 imp->imp_state = LUSTRE_IMP_NEW;
972 imp->imp_obd = class_incref(obd, "import", imp);
973 cfs_sema_init(&imp->imp_sec_mutex, 1);
974 cfs_waitq_init(&imp->imp_recovery_waitq);
976 cfs_atomic_set(&imp->imp_refcount, 2);
977 cfs_atomic_set(&imp->imp_unregistering, 0);
978 cfs_atomic_set(&imp->imp_inflight, 0);
979 cfs_atomic_set(&imp->imp_replay_inflight, 0);
980 cfs_atomic_set(&imp->imp_inval_count, 0);
981 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
982 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
983 class_handle_hash(&imp->imp_handle, import_handle_addref);
984 init_imp_at(&imp->imp_at);
986 /* the default magic is V2, will be used in connect RPC, and
987 * then adjusted according to the flags in request/reply. */
988 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
992 EXPORT_SYMBOL(class_new_import);
994 void class_destroy_import(struct obd_import *import)
996 LASSERT(import != NULL);
997 LASSERT(import != LP_POISON);
999 class_handle_unhash(&import->imp_handle);
1001 cfs_spin_lock(&import->imp_lock);
1002 import->imp_generation++;
1003 cfs_spin_unlock(&import->imp_lock);
1004 class_import_put(import);
1006 EXPORT_SYMBOL(class_destroy_import);
1008 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1010 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1012 cfs_spin_lock(&exp->exp_locks_list_guard);
1014 LASSERT(lock->l_exp_refs_nr >= 0);
1016 if (lock->l_exp_refs_target != NULL &&
1017 lock->l_exp_refs_target != exp) {
1018 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1019 exp, lock, lock->l_exp_refs_target);
1021 if ((lock->l_exp_refs_nr ++) == 0) {
1022 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1023 lock->l_exp_refs_target = exp;
1025 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1026 lock, exp, lock->l_exp_refs_nr);
1027 cfs_spin_unlock(&exp->exp_locks_list_guard);
1029 EXPORT_SYMBOL(__class_export_add_lock_ref);
1031 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1033 cfs_spin_lock(&exp->exp_locks_list_guard);
1034 LASSERT(lock->l_exp_refs_nr > 0);
1035 if (lock->l_exp_refs_target != exp) {
1036 LCONSOLE_WARN("lock %p, "
1037 "mismatching export pointers: %p, %p\n",
1038 lock, lock->l_exp_refs_target, exp);
1040 if (-- lock->l_exp_refs_nr == 0) {
1041 cfs_list_del_init(&lock->l_exp_refs_link);
1042 lock->l_exp_refs_target = NULL;
1044 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1045 lock, exp, lock->l_exp_refs_nr);
1046 cfs_spin_unlock(&exp->exp_locks_list_guard);
1048 EXPORT_SYMBOL(__class_export_del_lock_ref);
1051 /* A connection defines an export context in which preallocation can
1052 be managed. This releases the export pointer reference, and returns
1053 the export handle, so the export refcount is 1 when this function
1055 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1056 struct obd_uuid *cluuid)
1058 struct obd_export *export;
1059 LASSERT(conn != NULL);
1060 LASSERT(obd != NULL);
1061 LASSERT(cluuid != NULL);
1064 export = class_new_export(obd, cluuid);
1066 RETURN(PTR_ERR(export));
1068 conn->cookie = export->exp_handle.h_cookie;
1069 class_export_put(export);
1071 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1072 cluuid->uuid, conn->cookie);
1075 EXPORT_SYMBOL(class_connect);
1077 /* if export is involved in recovery then clean up related things */
1078 void class_export_recovery_cleanup(struct obd_export *exp)
1080 struct obd_device *obd = exp->exp_obd;
1082 cfs_spin_lock_bh(&obd->obd_processing_task_lock);
1083 if (exp->exp_delayed)
1084 obd->obd_delayed_clients--;
1085 if (obd->obd_recovering && exp->exp_in_recovery) {
1086 cfs_spin_lock(&exp->exp_lock);
1087 exp->exp_in_recovery = 0;
1088 cfs_spin_unlock(&exp->exp_lock);
1089 LASSERT(obd->obd_connected_clients);
1090 obd->obd_connected_clients--;
1092 /** Cleanup req replay fields */
1093 if (exp->exp_req_replay_needed) {
1094 cfs_spin_lock(&exp->exp_lock);
1095 exp->exp_req_replay_needed = 0;
1096 cfs_spin_unlock(&exp->exp_lock);
1097 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1098 cfs_atomic_dec(&obd->obd_req_replay_clients);
1100 /** Cleanup lock replay data */
1101 if (exp->exp_lock_replay_needed) {
1102 cfs_spin_lock(&exp->exp_lock);
1103 exp->exp_lock_replay_needed = 0;
1104 cfs_spin_unlock(&exp->exp_lock);
1105 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1106 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1108 cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
1111 /* This function removes 1-3 references from the export:
1112 * 1 - for export pointer passed
1113 * and if disconnect really need
1114 * 2 - removing from hash
1115 * 3 - in client_unlink_export
1116 * The export pointer passed to this function can destroyed */
1117 int class_disconnect(struct obd_export *export)
1119 int already_disconnected;
1122 if (export == NULL) {
1124 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1128 cfs_spin_lock(&export->exp_lock);
1129 already_disconnected = export->exp_disconnected;
1130 export->exp_disconnected = 1;
1131 cfs_spin_unlock(&export->exp_lock);
1133 /* class_cleanup(), abort_recovery(), and class_fail_export()
1134 * all end up in here, and if any of them race we shouldn't
1135 * call extra class_export_puts(). */
1136 if (already_disconnected) {
1137 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1138 GOTO(no_disconn, already_disconnected);
1141 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1142 export->exp_handle.h_cookie);
1144 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1145 cfs_hash_del(export->exp_obd->obd_nid_hash,
1146 &export->exp_connection->c_peer.nid,
1147 &export->exp_nid_hash);
1149 class_export_recovery_cleanup(export);
1150 class_unlink_export(export);
1152 class_export_put(export);
1156 /* Return non-zero for a fully connected export */
1157 int class_connected_export(struct obd_export *exp)
1161 cfs_spin_lock(&exp->exp_lock);
1162 connected = (exp->exp_conn_cnt > 0);
1163 cfs_spin_unlock(&exp->exp_lock);
1168 EXPORT_SYMBOL(class_connected_export);
1170 static void class_disconnect_export_list(cfs_list_t *list,
1171 enum obd_option flags)
1174 struct obd_export *exp;
1177 /* It's possible that an export may disconnect itself, but
1178 * nothing else will be added to this list. */
1179 while (!cfs_list_empty(list)) {
1180 exp = cfs_list_entry(list->next, struct obd_export,
1182 /* need for safe call CDEBUG after obd_disconnect */
1183 class_export_get(exp);
1185 cfs_spin_lock(&exp->exp_lock);
1186 exp->exp_flags = flags;
1187 cfs_spin_unlock(&exp->exp_lock);
1189 if (obd_uuid_equals(&exp->exp_client_uuid,
1190 &exp->exp_obd->obd_uuid)) {
1192 "exp %p export uuid == obd uuid, don't discon\n",
1194 /* Need to delete this now so we don't end up pointing
1195 * to work_list later when this export is cleaned up. */
1196 cfs_list_del_init(&exp->exp_obd_chain);
1197 class_export_put(exp);
1201 class_export_get(exp);
1202 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1203 "last request at "CFS_TIME_T"\n",
1204 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1205 exp, exp->exp_last_request_time);
1206 /* release one export reference anyway */
1207 rc = obd_disconnect(exp);
1209 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1210 obd_export_nid2str(exp), exp, rc);
1211 class_export_put(exp);
1216 void class_disconnect_exports(struct obd_device *obd)
1218 cfs_list_t work_list;
1221 /* Move all of the exports from obd_exports to a work list, en masse. */
1222 CFS_INIT_LIST_HEAD(&work_list);
1223 cfs_spin_lock(&obd->obd_dev_lock);
1224 cfs_list_splice_init(&obd->obd_exports, &work_list);
1225 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1226 cfs_spin_unlock(&obd->obd_dev_lock);
1228 if (!cfs_list_empty(&work_list)) {
1229 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1230 "disconnecting them\n", obd->obd_minor, obd);
1231 class_disconnect_export_list(&work_list,
1232 exp_flags_from_obd(obd));
1234 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1235 obd->obd_minor, obd);
1238 EXPORT_SYMBOL(class_disconnect_exports);
1240 /* Remove exports that have not completed recovery.
1242 void class_disconnect_stale_exports(struct obd_device *obd,
1243 int (*test_export)(struct obd_export *))
1245 cfs_list_t work_list;
1246 cfs_list_t *pos, *n;
1247 struct obd_export *exp;
1251 CFS_INIT_LIST_HEAD(&work_list);
1252 cfs_spin_lock(&obd->obd_dev_lock);
1253 cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1254 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1255 if (test_export(exp))
1258 /* don't count self-export as client */
1259 if (obd_uuid_equals(&exp->exp_client_uuid,
1260 &exp->exp_obd->obd_uuid))
1263 cfs_list_move(&exp->exp_obd_chain, &work_list);
1265 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1266 obd->obd_name, exp->exp_client_uuid.uuid,
1267 exp->exp_connection == NULL ? "<unknown>" :
1268 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1269 print_export_data(exp, "EVICTING", 0);
1271 cfs_spin_unlock(&obd->obd_dev_lock);
1274 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1275 obd->obd_name, evicted);
1276 obd->obd_stale_clients += evicted;
1278 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1279 OBD_OPT_ABORT_RECOV);
1282 EXPORT_SYMBOL(class_disconnect_stale_exports);
1284 void class_fail_export(struct obd_export *exp)
1286 int rc, already_failed;
1288 cfs_spin_lock(&exp->exp_lock);
1289 already_failed = exp->exp_failed;
1290 exp->exp_failed = 1;
1291 cfs_spin_unlock(&exp->exp_lock);
1293 if (already_failed) {
1294 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1295 exp, exp->exp_client_uuid.uuid);
1299 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1300 exp, exp->exp_client_uuid.uuid);
1302 if (obd_dump_on_timeout)
1303 libcfs_debug_dumplog();
1305 /* Most callers into obd_disconnect are removing their own reference
1306 * (request, for example) in addition to the one from the hash table.
1307 * We don't have such a reference here, so make one. */
1308 class_export_get(exp);
1309 rc = obd_disconnect(exp);
1311 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1313 CDEBUG(D_HA, "disconnected export %p/%s\n",
1314 exp, exp->exp_client_uuid.uuid);
1316 EXPORT_SYMBOL(class_fail_export);
1318 char *obd_export_nid2str(struct obd_export *exp)
1320 if (exp->exp_connection != NULL)
1321 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1325 EXPORT_SYMBOL(obd_export_nid2str);
1327 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1329 struct obd_export *doomed_exp = NULL;
1330 int exports_evicted = 0;
1332 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1335 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1336 if (doomed_exp == NULL)
1339 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1340 "nid %s found, wanted nid %s, requested nid %s\n",
1341 obd_export_nid2str(doomed_exp),
1342 libcfs_nid2str(nid_key), nid);
1343 LASSERTF(doomed_exp != obd->obd_self_export,
1344 "self-export is hashed by NID?\n");
1346 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1347 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1349 class_fail_export(doomed_exp);
1350 class_export_put(doomed_exp);
1353 if (!exports_evicted)
1354 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1355 obd->obd_name, nid);
1356 return exports_evicted;
1358 EXPORT_SYMBOL(obd_export_evict_by_nid);
1360 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1362 struct obd_export *doomed_exp = NULL;
1363 struct obd_uuid doomed_uuid;
1364 int exports_evicted = 0;
1366 obd_str2uuid(&doomed_uuid, uuid);
1367 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1368 CERROR("%s: can't evict myself\n", obd->obd_name);
1369 return exports_evicted;
1372 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1374 if (doomed_exp == NULL) {
1375 CERROR("%s: can't disconnect %s: no exports found\n",
1376 obd->obd_name, uuid);
1378 CWARN("%s: evicting %s at adminstrative request\n",
1379 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1380 class_fail_export(doomed_exp);
1381 class_export_put(doomed_exp);
1385 return exports_evicted;
1387 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1389 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1390 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1391 EXPORT_SYMBOL(class_export_dump_hook);
1394 static void print_export_data(struct obd_export *exp, const char *status,
1397 struct ptlrpc_reply_state *rs;
1398 struct ptlrpc_reply_state *first_reply = NULL;
1401 cfs_spin_lock(&exp->exp_lock);
1402 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1408 cfs_spin_unlock(&exp->exp_lock);
1410 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1411 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1412 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1413 cfs_atomic_read(&exp->exp_rpc_count),
1414 cfs_atomic_read(&exp->exp_cb_count),
1415 cfs_atomic_read(&exp->exp_locks_count),
1416 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1417 nreplies, first_reply, nreplies > 3 ? "..." : "",
1418 exp->exp_last_committed);
1419 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1420 if (locks && class_export_dump_hook != NULL)
1421 class_export_dump_hook(exp);
1425 void dump_exports(struct obd_device *obd, int locks)
1427 struct obd_export *exp;
1429 cfs_spin_lock(&obd->obd_dev_lock);
1430 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1431 print_export_data(exp, "ACTIVE", locks);
1432 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1433 print_export_data(exp, "UNLINKED", locks);
1434 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1435 print_export_data(exp, "DELAYED", locks);
1436 cfs_spin_unlock(&obd->obd_dev_lock);
1437 cfs_spin_lock(&obd_zombie_impexp_lock);
1438 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1439 print_export_data(exp, "ZOMBIE", locks);
1440 cfs_spin_unlock(&obd_zombie_impexp_lock);
1442 EXPORT_SYMBOL(dump_exports);
1444 void obd_exports_barrier(struct obd_device *obd)
1447 LASSERT(cfs_list_empty(&obd->obd_exports));
1448 cfs_spin_lock(&obd->obd_dev_lock);
1449 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1450 cfs_spin_unlock(&obd->obd_dev_lock);
1451 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1452 cfs_time_seconds(waited));
1453 if (waited > 5 && IS_PO2(waited)) {
1454 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1455 "more than %d seconds. "
1456 "The obd refcount = %d. Is it stuck?\n",
1457 obd->obd_name, waited,
1458 cfs_atomic_read(&obd->obd_refcount));
1459 dump_exports(obd, 1);
1462 cfs_spin_lock(&obd->obd_dev_lock);
1464 cfs_spin_unlock(&obd->obd_dev_lock);
1466 EXPORT_SYMBOL(obd_exports_barrier);
1469 * kill zombie imports and exports
1471 void obd_zombie_impexp_cull(void)
1473 struct obd_import *import;
1474 struct obd_export *export;
1478 cfs_spin_lock(&obd_zombie_impexp_lock);
1481 if (!cfs_list_empty(&obd_zombie_imports)) {
1482 import = cfs_list_entry(obd_zombie_imports.next,
1485 cfs_list_del_init(&import->imp_zombie_chain);
1489 if (!cfs_list_empty(&obd_zombie_exports)) {
1490 export = cfs_list_entry(obd_zombie_exports.next,
1493 cfs_list_del_init(&export->exp_obd_chain);
1496 cfs_spin_unlock(&obd_zombie_impexp_lock);
1499 class_import_destroy(import);
1502 class_export_destroy(export);
1505 } while (import != NULL || export != NULL);
1509 static cfs_completion_t obd_zombie_start;
1510 static cfs_completion_t obd_zombie_stop;
1511 static unsigned long obd_zombie_flags;
1512 static cfs_waitq_t obd_zombie_waitq;
1513 static pid_t obd_zombie_pid;
1516 OBD_ZOMBIE_STOP = 1 << 1
1520 * check for work for kill zombie import/export thread.
1522 static int obd_zombie_impexp_check(void *arg)
1526 cfs_spin_lock(&obd_zombie_impexp_lock);
1527 rc = cfs_list_empty(&obd_zombie_imports) &&
1528 cfs_list_empty(&obd_zombie_exports) &&
1529 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1531 cfs_spin_unlock(&obd_zombie_impexp_lock);
1537 * Add export to the obd_zombe thread and notify it.
1539 static void obd_zombie_export_add(struct obd_export *exp) {
1540 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1541 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1542 cfs_list_del_init(&exp->exp_obd_chain);
1543 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1544 cfs_spin_lock(&obd_zombie_impexp_lock);
1545 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1546 cfs_spin_unlock(&obd_zombie_impexp_lock);
1548 if (obd_zombie_impexp_notify != NULL)
1549 obd_zombie_impexp_notify();
1553 * Add import to the obd_zombe thread and notify it.
1555 static void obd_zombie_import_add(struct obd_import *imp) {
1556 LASSERT(imp->imp_sec == NULL);
1557 cfs_spin_lock(&obd_zombie_impexp_lock);
1558 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1559 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1560 cfs_spin_unlock(&obd_zombie_impexp_lock);
1562 if (obd_zombie_impexp_notify != NULL)
1563 obd_zombie_impexp_notify();
1567 * notify import/export destroy thread about new zombie.
1569 static void obd_zombie_impexp_notify(void)
1571 cfs_waitq_signal(&obd_zombie_waitq);
1575 * check whether obd_zombie is idle
1577 static int obd_zombie_is_idle(void)
1581 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1582 cfs_spin_lock(&obd_zombie_impexp_lock);
1583 rc = cfs_list_empty(&obd_zombie_imports) &&
1584 cfs_list_empty(&obd_zombie_exports);
1585 cfs_spin_unlock(&obd_zombie_impexp_lock);
1590 * wait when obd_zombie import/export queues become empty
1592 void obd_zombie_barrier(void)
1594 struct l_wait_info lwi = { 0 };
1596 if (obd_zombie_pid == cfs_curproc_pid())
1597 /* don't wait for myself */
1599 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1601 EXPORT_SYMBOL(obd_zombie_barrier);
1606 * destroy zombie export/import thread.
1608 static int obd_zombie_impexp_thread(void *unused)
1612 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1613 cfs_complete(&obd_zombie_start);
1617 cfs_complete(&obd_zombie_start);
1619 obd_zombie_pid = cfs_curproc_pid();
1621 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1622 struct l_wait_info lwi = { 0 };
1624 l_wait_event(obd_zombie_waitq,
1625 !obd_zombie_impexp_check(NULL), &lwi);
1626 obd_zombie_impexp_cull();
1629 * Notify obd_zombie_barrier callers that queues
1632 cfs_waitq_signal(&obd_zombie_waitq);
1635 cfs_complete(&obd_zombie_stop);
1640 #else /* ! KERNEL */
1642 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1643 static void *obd_zombie_impexp_work_cb;
1644 static void *obd_zombie_impexp_idle_cb;
1646 int obd_zombie_impexp_kill(void *arg)
1650 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1651 obd_zombie_impexp_cull();
1654 cfs_atomic_dec(&zombie_recur);
1661 * start destroy zombie import/export thread
1663 int obd_zombie_impexp_init(void)
1667 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1668 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1669 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1670 cfs_init_completion(&obd_zombie_start);
1671 cfs_init_completion(&obd_zombie_stop);
1672 cfs_waitq_init(&obd_zombie_waitq);
1676 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1680 cfs_wait_for_completion(&obd_zombie_start);
1683 obd_zombie_impexp_work_cb =
1684 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1685 &obd_zombie_impexp_kill, NULL);
1687 obd_zombie_impexp_idle_cb =
1688 liblustre_register_idle_callback("obd_zombi_impexp_check",
1689 &obd_zombie_impexp_check, NULL);
1695 * stop destroy zombie import/export thread
1697 void obd_zombie_impexp_stop(void)
1699 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1700 obd_zombie_impexp_notify();
1702 cfs_wait_for_completion(&obd_zombie_stop);
1704 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1705 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);