1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 cfs_spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
70 * support functions: we could use inter-module communication, but this
71 * is more portable to other OS's
73 static struct obd_device *obd_device_alloc(void)
75 struct obd_device *obd;
77 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79 obd->obd_magic = OBD_DEVICE_MAGIC;
83 EXPORT_SYMBOL(obd_device_alloc);
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 cfs_spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 cfs_spin_unlock(&obd_types_lock);
112 cfs_spin_unlock(&obd_types_lock);
116 struct obd_type *class_get_type(const char *name)
118 struct obd_type *type = class_search_type(name);
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
122 const char *modname = name;
123 if (!cfs_request_module("%s", modname)) {
124 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
125 type = class_search_type(name);
127 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
133 cfs_spin_lock(&type->obd_type_lock);
135 cfs_try_module_get(type->typ_dt_ops->o_owner);
136 cfs_spin_unlock(&type->obd_type_lock);
141 void class_put_type(struct obd_type *type)
144 cfs_spin_lock(&type->obd_type_lock);
146 cfs_module_put(type->typ_dt_ops->o_owner);
147 cfs_spin_unlock(&type->obd_type_lock);
150 #define CLASS_MAX_NAME 1024
152 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
153 struct lprocfs_vars *vars, const char *name,
154 struct lu_device_type *ldt)
156 struct obd_type *type;
161 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
163 if (class_search_type(name)) {
164 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
169 OBD_ALLOC(type, sizeof(*type));
173 OBD_ALLOC_PTR(type->typ_dt_ops);
174 OBD_ALLOC_PTR(type->typ_md_ops);
175 OBD_ALLOC(type->typ_name, strlen(name) + 1);
177 if (type->typ_dt_ops == NULL ||
178 type->typ_md_ops == NULL ||
179 type->typ_name == NULL)
182 *(type->typ_dt_ops) = *dt_ops;
183 /* md_ops is optional */
185 *(type->typ_md_ops) = *md_ops;
186 strcpy(type->typ_name, name);
187 cfs_spin_lock_init(&type->obd_type_lock);
190 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
192 if (IS_ERR(type->typ_procroot)) {
193 rc = PTR_ERR(type->typ_procroot);
194 type->typ_procroot = NULL;
200 rc = lu_device_type_init(ldt);
205 cfs_spin_lock(&obd_types_lock);
206 cfs_list_add(&type->typ_chain, &obd_types);
207 cfs_spin_unlock(&obd_types_lock);
212 if (type->typ_name != NULL)
213 OBD_FREE(type->typ_name, strlen(name) + 1);
214 if (type->typ_md_ops != NULL)
215 OBD_FREE_PTR(type->typ_md_ops);
216 if (type->typ_dt_ops != NULL)
217 OBD_FREE_PTR(type->typ_dt_ops);
218 OBD_FREE(type, sizeof(*type));
222 int class_unregister_type(const char *name)
224 struct obd_type *type = class_search_type(name);
228 CERROR("unknown obd type\n");
232 if (type->typ_refcnt) {
233 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
234 /* This is a bad situation, let's make the best of it */
235 /* Remove ops, but leave the name for debugging */
236 OBD_FREE_PTR(type->typ_dt_ops);
237 OBD_FREE_PTR(type->typ_md_ops);
241 if (type->typ_procroot) {
242 lprocfs_remove(&type->typ_procroot);
246 lu_device_type_fini(type->typ_lu);
248 cfs_spin_lock(&obd_types_lock);
249 cfs_list_del(&type->typ_chain);
250 cfs_spin_unlock(&obd_types_lock);
251 OBD_FREE(type->typ_name, strlen(name) + 1);
252 if (type->typ_dt_ops != NULL)
253 OBD_FREE_PTR(type->typ_dt_ops);
254 if (type->typ_md_ops != NULL)
255 OBD_FREE_PTR(type->typ_md_ops);
256 OBD_FREE(type, sizeof(*type));
258 } /* class_unregister_type */
261 * Create a new obd device.
263 * Find an empty slot in ::obd_devs[], create a new obd device in it.
265 * \param[in] type_name obd device type string.
266 * \param[in] name obd device name.
268 * \retval NULL if create fails, otherwise return the obd device
271 struct obd_device *class_newdev(const char *type_name, const char *name)
273 struct obd_device *result = NULL;
274 struct obd_device *newdev;
275 struct obd_type *type = NULL;
277 int new_obd_minor = 0;
279 if (strlen(name) >= MAX_OBD_NAME) {
280 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
281 RETURN(ERR_PTR(-EINVAL));
284 type = class_get_type(type_name);
286 CERROR("OBD: unknown type: %s\n", type_name);
287 RETURN(ERR_PTR(-ENODEV));
290 newdev = obd_device_alloc();
291 if (newdev == NULL) {
292 class_put_type(type);
293 RETURN(ERR_PTR(-ENOMEM));
295 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
297 cfs_spin_lock(&obd_dev_lock);
298 for (i = 0; i < class_devno_max(); i++) {
299 struct obd_device *obd = class_num2obd(i);
300 if (obd && obd->obd_name &&
301 (strcmp(name, obd->obd_name) == 0)) {
302 CERROR("Device %s already exists, won't add\n", name);
304 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
305 "%p obd_magic %08x != %08x\n", result,
306 result->obd_magic, OBD_DEVICE_MAGIC);
307 LASSERTF(result->obd_minor == new_obd_minor,
308 "%p obd_minor %d != %d\n", result,
309 result->obd_minor, new_obd_minor);
311 obd_devs[result->obd_minor] = NULL;
312 result->obd_name[0]='\0';
314 result = ERR_PTR(-EEXIST);
317 if (!result && !obd) {
319 result->obd_minor = i;
321 result->obd_type = type;
322 strncpy(result->obd_name, name,
323 sizeof(result->obd_name) - 1);
324 obd_devs[i] = result;
327 cfs_spin_unlock(&obd_dev_lock);
329 if (result == NULL && i >= class_devno_max()) {
330 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
332 result = ERR_PTR(-EOVERFLOW);
335 if (IS_ERR(result)) {
336 obd_device_free(newdev);
337 class_put_type(type);
339 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
340 result->obd_name, result);
345 void class_release_dev(struct obd_device *obd)
347 struct obd_type *obd_type = obd->obd_type;
349 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
350 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
351 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
352 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
353 LASSERT(obd_type != NULL);
355 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
356 obd->obd_name,obd->obd_type->typ_name);
358 cfs_spin_lock(&obd_dev_lock);
359 obd_devs[obd->obd_minor] = NULL;
360 cfs_spin_unlock(&obd_dev_lock);
361 obd_device_free(obd);
363 class_put_type(obd_type);
366 int class_name2dev(const char *name)
373 cfs_spin_lock(&obd_dev_lock);
374 for (i = 0; i < class_devno_max(); i++) {
375 struct obd_device *obd = class_num2obd(i);
376 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
377 /* Make sure we finished attaching before we give
378 out any references */
379 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
380 if (obd->obd_attached) {
381 cfs_spin_unlock(&obd_dev_lock);
387 cfs_spin_unlock(&obd_dev_lock);
392 struct obd_device *class_name2obd(const char *name)
394 int dev = class_name2dev(name);
396 if (dev < 0 || dev > class_devno_max())
398 return class_num2obd(dev);
401 int class_uuid2dev(struct obd_uuid *uuid)
405 cfs_spin_lock(&obd_dev_lock);
406 for (i = 0; i < class_devno_max(); i++) {
407 struct obd_device *obd = class_num2obd(i);
408 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
409 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
410 cfs_spin_unlock(&obd_dev_lock);
414 cfs_spin_unlock(&obd_dev_lock);
419 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
421 int dev = class_uuid2dev(uuid);
424 return class_num2obd(dev);
428 * Get obd device from ::obd_devs[]
430 * \param num [in] array index
432 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
433 * otherwise return the obd device there.
435 struct obd_device *class_num2obd(int num)
437 struct obd_device *obd = NULL;
439 if (num < class_devno_max()) {
444 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
445 "%p obd_magic %08x != %08x\n",
446 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
447 LASSERTF(obd->obd_minor == num,
448 "%p obd_minor %0d != %0d\n",
449 obd, obd->obd_minor, num);
455 void class_obd_list(void)
460 cfs_spin_lock(&obd_dev_lock);
461 for (i = 0; i < class_devno_max(); i++) {
462 struct obd_device *obd = class_num2obd(i);
465 if (obd->obd_stopping)
467 else if (obd->obd_set_up)
469 else if (obd->obd_attached)
473 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
474 i, status, obd->obd_type->typ_name,
475 obd->obd_name, obd->obd_uuid.uuid,
476 cfs_atomic_read(&obd->obd_refcount));
478 cfs_spin_unlock(&obd_dev_lock);
482 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
483 specified, then only the client with that uuid is returned,
484 otherwise any client connected to the tgt is returned. */
485 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
486 const char * typ_name,
487 struct obd_uuid *grp_uuid)
491 cfs_spin_lock(&obd_dev_lock);
492 for (i = 0; i < class_devno_max(); i++) {
493 struct obd_device *obd = class_num2obd(i);
496 if ((strncmp(obd->obd_type->typ_name, typ_name,
497 strlen(typ_name)) == 0)) {
498 if (obd_uuid_equals(tgt_uuid,
499 &obd->u.cli.cl_target_uuid) &&
500 ((grp_uuid)? obd_uuid_equals(grp_uuid,
501 &obd->obd_uuid) : 1)) {
502 cfs_spin_unlock(&obd_dev_lock);
507 cfs_spin_unlock(&obd_dev_lock);
512 /* Iterate the obd_device list looking devices have grp_uuid. Start
513 searching at *next, and if a device is found, the next index to look
514 at is saved in *next. If next is NULL, then the first matching device
515 will always be returned. */
516 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
522 else if (*next >= 0 && *next < class_devno_max())
527 cfs_spin_lock(&obd_dev_lock);
528 for (; i < class_devno_max(); i++) {
529 struct obd_device *obd = class_num2obd(i);
532 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
535 cfs_spin_unlock(&obd_dev_lock);
539 cfs_spin_unlock(&obd_dev_lock);
545 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
546 * adjust sptlrpc settings accordingly.
548 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
550 struct obd_device *obd;
554 LASSERT(namelen > 0);
556 cfs_spin_lock(&obd_dev_lock);
557 for (i = 0; i < class_devno_max(); i++) {
558 obd = class_num2obd(i);
560 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
563 /* only notify mdc, osc, mdt, ost */
564 type = obd->obd_type->typ_name;
565 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
566 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
567 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
568 strcmp(type, LUSTRE_OST_NAME) != 0)
571 if (strncmp(obd->obd_name, fsname, namelen))
574 class_incref(obd, __FUNCTION__, obd);
575 cfs_spin_unlock(&obd_dev_lock);
576 rc2 = obd_set_info_async(obd->obd_self_export,
577 sizeof(KEY_SPTLRPC_CONF),
578 KEY_SPTLRPC_CONF, 0, NULL, NULL);
580 class_decref(obd, __FUNCTION__, obd);
581 cfs_spin_lock(&obd_dev_lock);
583 cfs_spin_unlock(&obd_dev_lock);
586 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
588 void obd_cleanup_caches(void)
593 if (obd_device_cachep) {
594 rc = cfs_mem_cache_destroy(obd_device_cachep);
595 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
596 obd_device_cachep = NULL;
599 rc = cfs_mem_cache_destroy(obdo_cachep);
600 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
604 rc = cfs_mem_cache_destroy(import_cachep);
605 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
606 import_cachep = NULL;
609 rc = cfs_mem_cache_destroy(capa_cachep);
610 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
616 int obd_init_caches(void)
620 LASSERT(obd_device_cachep == NULL);
621 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
622 sizeof(struct obd_device),
624 if (!obd_device_cachep)
627 LASSERT(obdo_cachep == NULL);
628 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
633 LASSERT(import_cachep == NULL);
634 import_cachep = cfs_mem_cache_create("ll_import_cache",
635 sizeof(struct obd_import),
640 LASSERT(capa_cachep == NULL);
641 capa_cachep = cfs_mem_cache_create("capa_cache",
642 sizeof(struct obd_capa), 0, 0);
648 obd_cleanup_caches();
653 /* map connection to client */
654 struct obd_export *class_conn2export(struct lustre_handle *conn)
656 struct obd_export *export;
660 CDEBUG(D_CACHE, "looking for null handle\n");
664 if (conn->cookie == -1) { /* this means assign a new connection */
665 CDEBUG(D_CACHE, "want a new connection\n");
669 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
670 export = class_handle2object(conn->cookie);
674 struct obd_device *class_exp2obd(struct obd_export *exp)
681 struct obd_device *class_conn2obd(struct lustre_handle *conn)
683 struct obd_export *export;
684 export = class_conn2export(conn);
686 struct obd_device *obd = export->exp_obd;
687 class_export_put(export);
693 struct obd_import *class_exp2cliimp(struct obd_export *exp)
695 struct obd_device *obd = exp->exp_obd;
698 return obd->u.cli.cl_import;
701 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
703 struct obd_device *obd = class_conn2obd(conn);
706 return obd->u.cli.cl_import;
709 /* Export management functions */
710 static void class_export_destroy(struct obd_export *exp)
712 struct obd_device *obd = exp->exp_obd;
715 LASSERT (cfs_atomic_read(&exp->exp_refcount) == 0);
717 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
718 exp->exp_client_uuid.uuid, obd->obd_name);
720 LASSERT(obd != NULL);
722 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
723 if (exp->exp_connection)
724 ptlrpc_put_connection_superhack(exp->exp_connection);
726 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
727 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
728 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
729 LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
730 obd_destroy_export(exp);
731 class_decref(obd, "export", exp);
733 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
737 static void export_handle_addref(void *export)
739 class_export_get(export);
742 struct obd_export *class_export_get(struct obd_export *exp)
744 cfs_atomic_inc(&exp->exp_refcount);
745 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
746 cfs_atomic_read(&exp->exp_refcount));
749 EXPORT_SYMBOL(class_export_get);
751 void class_export_put(struct obd_export *exp)
753 LASSERT(exp != NULL);
754 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
755 cfs_atomic_read(&exp->exp_refcount) - 1);
756 LASSERT(cfs_atomic_read(&exp->exp_refcount) > 0);
757 LASSERT(cfs_atomic_read(&exp->exp_refcount) < 0x5a5a5a);
759 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
760 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
761 CDEBUG(D_IOCTL, "final put %p/%s\n",
762 exp, exp->exp_client_uuid.uuid);
764 /* release nid stat refererence */
765 lprocfs_exp_cleanup(exp);
767 obd_zombie_export_add(exp);
770 EXPORT_SYMBOL(class_export_put);
772 /* Creates a new export, adds it to the hash table, and returns a
773 * pointer to it. The refcount is 2: one for the hash reference, and
774 * one for the pointer returned by this function. */
775 struct obd_export *class_new_export(struct obd_device *obd,
776 struct obd_uuid *cluuid)
778 struct obd_export *export;
779 cfs_hash_t *hash = NULL;
783 OBD_ALLOC_PTR(export);
785 return ERR_PTR(-ENOMEM);
787 export->exp_conn_cnt = 0;
788 export->exp_lock_hash = NULL;
789 cfs_atomic_set(&export->exp_refcount, 2);
790 cfs_atomic_set(&export->exp_rpc_count, 0);
791 cfs_atomic_set(&export->exp_cb_count, 0);
792 cfs_atomic_set(&export->exp_locks_count, 0);
793 #if LUSTRE_TRACKS_LOCK_EXP_REFS
794 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
795 cfs_spin_lock_init(&export->exp_locks_list_guard);
797 cfs_atomic_set(&export->exp_replay_count, 0);
798 export->exp_obd = obd;
799 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
800 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
801 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
802 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
803 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
804 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
805 class_handle_hash(&export->exp_handle, export_handle_addref);
806 export->exp_last_request_time = cfs_time_current_sec();
807 cfs_spin_lock_init(&export->exp_lock);
808 cfs_spin_lock_init(&export->exp_rpc_lock);
809 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
810 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
812 export->exp_sp_peer = LUSTRE_SP_ANY;
813 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
814 export->exp_client_uuid = *cluuid;
815 obd_init_export(export);
817 cfs_spin_lock(&obd->obd_dev_lock);
818 /* shouldn't happen, but might race */
819 if (obd->obd_stopping)
820 GOTO(exit_unlock, rc = -ENODEV);
822 hash = cfs_hash_getref(obd->obd_uuid_hash);
824 GOTO(exit_unlock, rc = -ENODEV);
825 cfs_spin_unlock(&obd->obd_dev_lock);
827 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
828 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
830 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
831 obd->obd_name, cluuid->uuid, rc);
832 GOTO(exit_err, rc = -EALREADY);
836 cfs_spin_lock(&obd->obd_dev_lock);
837 if (obd->obd_stopping) {
838 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
839 GOTO(exit_unlock, rc = -ENODEV);
842 class_incref(obd, "export", export);
843 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
844 cfs_list_add_tail(&export->exp_obd_chain_timed,
845 &export->exp_obd->obd_exports_timed);
846 export->exp_obd->obd_num_exports++;
847 cfs_spin_unlock(&obd->obd_dev_lock);
848 cfs_hash_putref(hash);
852 cfs_spin_unlock(&obd->obd_dev_lock);
855 cfs_hash_putref(hash);
856 class_handle_unhash(&export->exp_handle);
857 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
858 obd_destroy_export(export);
859 OBD_FREE_PTR(export);
862 EXPORT_SYMBOL(class_new_export);
864 void class_unlink_export(struct obd_export *exp)
866 class_handle_unhash(&exp->exp_handle);
868 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
869 /* delete an uuid-export hashitem from hashtables */
870 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
871 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
872 &exp->exp_client_uuid,
873 &exp->exp_uuid_hash);
875 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
876 cfs_list_del_init(&exp->exp_obd_chain_timed);
877 exp->exp_obd->obd_num_exports--;
878 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
879 class_export_put(exp);
881 EXPORT_SYMBOL(class_unlink_export);
883 /* Import management functions */
884 void class_import_destroy(struct obd_import *imp)
888 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
889 imp->imp_obd->obd_name);
891 LASSERT(cfs_atomic_read(&imp->imp_refcount) == 0);
893 ptlrpc_put_connection_superhack(imp->imp_connection);
895 while (!cfs_list_empty(&imp->imp_conn_list)) {
896 struct obd_import_conn *imp_conn;
898 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
899 struct obd_import_conn, oic_item);
900 cfs_list_del_init(&imp_conn->oic_item);
901 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
902 OBD_FREE(imp_conn, sizeof(*imp_conn));
905 LASSERT(imp->imp_sec == NULL);
906 class_decref(imp->imp_obd, "import", imp);
907 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
911 static void import_handle_addref(void *import)
913 class_import_get(import);
916 struct obd_import *class_import_get(struct obd_import *import)
918 LASSERT(cfs_atomic_read(&import->imp_refcount) >= 0);
919 LASSERT(cfs_atomic_read(&import->imp_refcount) < 0x5a5a5a);
920 cfs_atomic_inc(&import->imp_refcount);
921 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
922 cfs_atomic_read(&import->imp_refcount),
923 import->imp_obd->obd_name);
926 EXPORT_SYMBOL(class_import_get);
928 void class_import_put(struct obd_import *imp)
932 LASSERT(cfs_atomic_read(&imp->imp_refcount) > 0);
933 LASSERT(cfs_atomic_read(&imp->imp_refcount) < 0x5a5a5a);
934 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
936 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
937 cfs_atomic_read(&imp->imp_refcount) - 1,
938 imp->imp_obd->obd_name);
940 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
941 CDEBUG(D_INFO, "final put import %p\n", imp);
942 obd_zombie_import_add(imp);
947 EXPORT_SYMBOL(class_import_put);
949 static void init_imp_at(struct imp_at *at) {
951 at_init(&at->iat_net_latency, 0, 0);
952 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
953 /* max service estimates are tracked on the server side, so
954 don't use the AT history here, just use the last reported
955 val. (But keep hist for proc histogram, worst_ever) */
956 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
961 struct obd_import *class_new_import(struct obd_device *obd)
963 struct obd_import *imp;
965 OBD_ALLOC(imp, sizeof(*imp));
969 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
970 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
971 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
972 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
973 cfs_spin_lock_init(&imp->imp_lock);
974 imp->imp_last_success_conn = 0;
975 imp->imp_state = LUSTRE_IMP_NEW;
976 imp->imp_obd = class_incref(obd, "import", imp);
977 cfs_sema_init(&imp->imp_sec_mutex, 1);
978 cfs_waitq_init(&imp->imp_recovery_waitq);
980 cfs_atomic_set(&imp->imp_refcount, 2);
981 cfs_atomic_set(&imp->imp_unregistering, 0);
982 cfs_atomic_set(&imp->imp_inflight, 0);
983 cfs_atomic_set(&imp->imp_replay_inflight, 0);
984 cfs_atomic_set(&imp->imp_inval_count, 0);
985 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
986 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
987 class_handle_hash(&imp->imp_handle, import_handle_addref);
988 init_imp_at(&imp->imp_at);
990 /* the default magic is V2, will be used in connect RPC, and
991 * then adjusted according to the flags in request/reply. */
992 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
996 EXPORT_SYMBOL(class_new_import);
998 void class_destroy_import(struct obd_import *import)
1000 LASSERT(import != NULL);
1001 LASSERT(import != LP_POISON);
1003 class_handle_unhash(&import->imp_handle);
1005 cfs_spin_lock(&import->imp_lock);
1006 import->imp_generation++;
1007 cfs_spin_unlock(&import->imp_lock);
1008 class_import_put(import);
1010 EXPORT_SYMBOL(class_destroy_import);
1012 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1014 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1016 cfs_spin_lock(&exp->exp_locks_list_guard);
1018 LASSERT(lock->l_exp_refs_nr >= 0);
1020 if (lock->l_exp_refs_target != NULL &&
1021 lock->l_exp_refs_target != exp) {
1022 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1023 exp, lock, lock->l_exp_refs_target);
1025 if ((lock->l_exp_refs_nr ++) == 0) {
1026 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1027 lock->l_exp_refs_target = exp;
1029 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1030 lock, exp, lock->l_exp_refs_nr);
1031 cfs_spin_unlock(&exp->exp_locks_list_guard);
1033 EXPORT_SYMBOL(__class_export_add_lock_ref);
1035 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1037 cfs_spin_lock(&exp->exp_locks_list_guard);
1038 LASSERT(lock->l_exp_refs_nr > 0);
1039 if (lock->l_exp_refs_target != exp) {
1040 LCONSOLE_WARN("lock %p, "
1041 "mismatching export pointers: %p, %p\n",
1042 lock, lock->l_exp_refs_target, exp);
1044 if (-- lock->l_exp_refs_nr == 0) {
1045 cfs_list_del_init(&lock->l_exp_refs_link);
1046 lock->l_exp_refs_target = NULL;
1048 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1049 lock, exp, lock->l_exp_refs_nr);
1050 cfs_spin_unlock(&exp->exp_locks_list_guard);
1052 EXPORT_SYMBOL(__class_export_del_lock_ref);
1055 /* A connection defines an export context in which preallocation can
1056 be managed. This releases the export pointer reference, and returns
1057 the export handle, so the export refcount is 1 when this function
1059 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1060 struct obd_uuid *cluuid)
1062 struct obd_export *export;
1063 LASSERT(conn != NULL);
1064 LASSERT(obd != NULL);
1065 LASSERT(cluuid != NULL);
1068 export = class_new_export(obd, cluuid);
1070 RETURN(PTR_ERR(export));
1072 conn->cookie = export->exp_handle.h_cookie;
1073 class_export_put(export);
1075 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1076 cluuid->uuid, conn->cookie);
1079 EXPORT_SYMBOL(class_connect);
1081 /* if export is involved in recovery then clean up related things */
1082 void class_export_recovery_cleanup(struct obd_export *exp)
1084 struct obd_device *obd = exp->exp_obd;
1086 cfs_spin_lock(&obd->obd_recovery_task_lock);
1087 if (exp->exp_delayed)
1088 obd->obd_delayed_clients--;
1089 if (obd->obd_recovering && exp->exp_in_recovery) {
1090 cfs_spin_lock(&exp->exp_lock);
1091 exp->exp_in_recovery = 0;
1092 cfs_spin_unlock(&exp->exp_lock);
1093 LASSERT(obd->obd_connected_clients);
1094 obd->obd_connected_clients--;
1096 cfs_spin_unlock(&obd->obd_recovery_task_lock);
1097 /** Cleanup req replay fields */
1098 if (exp->exp_req_replay_needed) {
1099 cfs_spin_lock(&exp->exp_lock);
1100 exp->exp_req_replay_needed = 0;
1101 cfs_spin_unlock(&exp->exp_lock);
1102 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1103 cfs_atomic_dec(&obd->obd_req_replay_clients);
1105 /** Cleanup lock replay data */
1106 if (exp->exp_lock_replay_needed) {
1107 cfs_spin_lock(&exp->exp_lock);
1108 exp->exp_lock_replay_needed = 0;
1109 cfs_spin_unlock(&exp->exp_lock);
1110 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1111 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1115 /* This function removes 1-3 references from the export:
1116 * 1 - for export pointer passed
1117 * and if disconnect really need
1118 * 2 - removing from hash
1119 * 3 - in client_unlink_export
1120 * The export pointer passed to this function can destroyed */
1121 int class_disconnect(struct obd_export *export)
1123 int already_disconnected;
1126 if (export == NULL) {
1128 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1132 cfs_spin_lock(&export->exp_lock);
1133 already_disconnected = export->exp_disconnected;
1134 export->exp_disconnected = 1;
1135 cfs_spin_unlock(&export->exp_lock);
1137 /* class_cleanup(), abort_recovery(), and class_fail_export()
1138 * all end up in here, and if any of them race we shouldn't
1139 * call extra class_export_puts(). */
1140 if (already_disconnected) {
1141 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1142 GOTO(no_disconn, already_disconnected);
1145 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1146 export->exp_handle.h_cookie);
1148 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1149 cfs_hash_del(export->exp_obd->obd_nid_hash,
1150 &export->exp_connection->c_peer.nid,
1151 &export->exp_nid_hash);
1153 class_export_recovery_cleanup(export);
1154 class_unlink_export(export);
1156 class_export_put(export);
1160 /* Return non-zero for a fully connected export */
1161 int class_connected_export(struct obd_export *exp)
1165 cfs_spin_lock(&exp->exp_lock);
1166 connected = (exp->exp_conn_cnt > 0);
1167 cfs_spin_unlock(&exp->exp_lock);
1172 EXPORT_SYMBOL(class_connected_export);
1174 static void class_disconnect_export_list(cfs_list_t *list,
1175 enum obd_option flags)
1178 struct obd_export *exp;
1181 /* It's possible that an export may disconnect itself, but
1182 * nothing else will be added to this list. */
1183 while (!cfs_list_empty(list)) {
1184 exp = cfs_list_entry(list->next, struct obd_export,
1186 /* need for safe call CDEBUG after obd_disconnect */
1187 class_export_get(exp);
1189 cfs_spin_lock(&exp->exp_lock);
1190 exp->exp_flags = flags;
1191 cfs_spin_unlock(&exp->exp_lock);
1193 if (obd_uuid_equals(&exp->exp_client_uuid,
1194 &exp->exp_obd->obd_uuid)) {
1196 "exp %p export uuid == obd uuid, don't discon\n",
1198 /* Need to delete this now so we don't end up pointing
1199 * to work_list later when this export is cleaned up. */
1200 cfs_list_del_init(&exp->exp_obd_chain);
1201 class_export_put(exp);
1205 class_export_get(exp);
1206 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1207 "last request at "CFS_TIME_T"\n",
1208 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1209 exp, exp->exp_last_request_time);
1210 /* release one export reference anyway */
1211 rc = obd_disconnect(exp);
1213 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1214 obd_export_nid2str(exp), exp, rc);
1215 class_export_put(exp);
1220 void class_disconnect_exports(struct obd_device *obd)
1222 cfs_list_t work_list;
1225 /* Move all of the exports from obd_exports to a work list, en masse. */
1226 CFS_INIT_LIST_HEAD(&work_list);
1227 cfs_spin_lock(&obd->obd_dev_lock);
1228 cfs_list_splice_init(&obd->obd_exports, &work_list);
1229 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1230 cfs_spin_unlock(&obd->obd_dev_lock);
1232 if (!cfs_list_empty(&work_list)) {
1233 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1234 "disconnecting them\n", obd->obd_minor, obd);
1235 class_disconnect_export_list(&work_list,
1236 exp_flags_from_obd(obd));
1238 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1239 obd->obd_minor, obd);
1242 EXPORT_SYMBOL(class_disconnect_exports);
1244 /* Remove exports that have not completed recovery.
1246 void class_disconnect_stale_exports(struct obd_device *obd,
1247 int (*test_export)(struct obd_export *))
1249 cfs_list_t work_list;
1250 cfs_list_t *pos, *n;
1251 struct obd_export *exp;
1255 CFS_INIT_LIST_HEAD(&work_list);
1256 cfs_spin_lock(&obd->obd_dev_lock);
1257 cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1258 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1259 if (test_export(exp))
1262 /* don't count self-export as client */
1263 if (obd_uuid_equals(&exp->exp_client_uuid,
1264 &exp->exp_obd->obd_uuid))
1267 cfs_list_move(&exp->exp_obd_chain, &work_list);
1269 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1270 obd->obd_name, exp->exp_client_uuid.uuid,
1271 exp->exp_connection == NULL ? "<unknown>" :
1272 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1273 print_export_data(exp, "EVICTING", 0);
1275 cfs_spin_unlock(&obd->obd_dev_lock);
1278 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1279 obd->obd_name, evicted);
1280 obd->obd_stale_clients += evicted;
1282 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1283 OBD_OPT_ABORT_RECOV);
1286 EXPORT_SYMBOL(class_disconnect_stale_exports);
1288 void class_fail_export(struct obd_export *exp)
1290 int rc, already_failed;
1292 cfs_spin_lock(&exp->exp_lock);
1293 already_failed = exp->exp_failed;
1294 exp->exp_failed = 1;
1295 cfs_spin_unlock(&exp->exp_lock);
1297 if (already_failed) {
1298 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1299 exp, exp->exp_client_uuid.uuid);
1303 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1304 exp, exp->exp_client_uuid.uuid);
1306 if (obd_dump_on_timeout)
1307 libcfs_debug_dumplog();
1309 /* Most callers into obd_disconnect are removing their own reference
1310 * (request, for example) in addition to the one from the hash table.
1311 * We don't have such a reference here, so make one. */
1312 class_export_get(exp);
1313 rc = obd_disconnect(exp);
1315 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1317 CDEBUG(D_HA, "disconnected export %p/%s\n",
1318 exp, exp->exp_client_uuid.uuid);
1320 EXPORT_SYMBOL(class_fail_export);
1322 char *obd_export_nid2str(struct obd_export *exp)
1324 if (exp->exp_connection != NULL)
1325 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1329 EXPORT_SYMBOL(obd_export_nid2str);
1331 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1333 struct obd_export *doomed_exp = NULL;
1334 int exports_evicted = 0;
1336 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1339 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1340 if (doomed_exp == NULL)
1343 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1344 "nid %s found, wanted nid %s, requested nid %s\n",
1345 obd_export_nid2str(doomed_exp),
1346 libcfs_nid2str(nid_key), nid);
1347 LASSERTF(doomed_exp != obd->obd_self_export,
1348 "self-export is hashed by NID?\n");
1350 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1351 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1353 class_fail_export(doomed_exp);
1354 class_export_put(doomed_exp);
1357 if (!exports_evicted)
1358 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1359 obd->obd_name, nid);
1360 return exports_evicted;
1362 EXPORT_SYMBOL(obd_export_evict_by_nid);
1364 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1366 struct obd_export *doomed_exp = NULL;
1367 struct obd_uuid doomed_uuid;
1368 int exports_evicted = 0;
1370 obd_str2uuid(&doomed_uuid, uuid);
1371 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1372 CERROR("%s: can't evict myself\n", obd->obd_name);
1373 return exports_evicted;
1376 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1378 if (doomed_exp == NULL) {
1379 CERROR("%s: can't disconnect %s: no exports found\n",
1380 obd->obd_name, uuid);
1382 CWARN("%s: evicting %s at adminstrative request\n",
1383 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1384 class_fail_export(doomed_exp);
1385 class_export_put(doomed_exp);
1389 return exports_evicted;
1391 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1393 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1394 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1395 EXPORT_SYMBOL(class_export_dump_hook);
1398 static void print_export_data(struct obd_export *exp, const char *status,
1401 struct ptlrpc_reply_state *rs;
1402 struct ptlrpc_reply_state *first_reply = NULL;
1405 cfs_spin_lock(&exp->exp_lock);
1406 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1412 cfs_spin_unlock(&exp->exp_lock);
1414 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1415 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1416 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1417 cfs_atomic_read(&exp->exp_rpc_count),
1418 cfs_atomic_read(&exp->exp_cb_count),
1419 cfs_atomic_read(&exp->exp_locks_count),
1420 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1421 nreplies, first_reply, nreplies > 3 ? "..." : "",
1422 exp->exp_last_committed);
1423 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1424 if (locks && class_export_dump_hook != NULL)
1425 class_export_dump_hook(exp);
1429 void dump_exports(struct obd_device *obd, int locks)
1431 struct obd_export *exp;
1433 cfs_spin_lock(&obd->obd_dev_lock);
1434 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1435 print_export_data(exp, "ACTIVE", locks);
1436 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1437 print_export_data(exp, "UNLINKED", locks);
1438 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1439 print_export_data(exp, "DELAYED", locks);
1440 cfs_spin_unlock(&obd->obd_dev_lock);
1441 cfs_spin_lock(&obd_zombie_impexp_lock);
1442 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1443 print_export_data(exp, "ZOMBIE", locks);
1444 cfs_spin_unlock(&obd_zombie_impexp_lock);
1446 EXPORT_SYMBOL(dump_exports);
1448 void obd_exports_barrier(struct obd_device *obd)
1451 LASSERT(cfs_list_empty(&obd->obd_exports));
1452 cfs_spin_lock(&obd->obd_dev_lock);
1453 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1454 cfs_spin_unlock(&obd->obd_dev_lock);
1455 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1456 cfs_time_seconds(waited));
1457 if (waited > 5 && IS_PO2(waited)) {
1458 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1459 "more than %d seconds. "
1460 "The obd refcount = %d. Is it stuck?\n",
1461 obd->obd_name, waited,
1462 cfs_atomic_read(&obd->obd_refcount));
1463 dump_exports(obd, 1);
1466 cfs_spin_lock(&obd->obd_dev_lock);
1468 cfs_spin_unlock(&obd->obd_dev_lock);
1470 EXPORT_SYMBOL(obd_exports_barrier);
1473 * kill zombie imports and exports
1475 void obd_zombie_impexp_cull(void)
1477 struct obd_import *import;
1478 struct obd_export *export;
1482 cfs_spin_lock(&obd_zombie_impexp_lock);
1485 if (!cfs_list_empty(&obd_zombie_imports)) {
1486 import = cfs_list_entry(obd_zombie_imports.next,
1489 cfs_list_del_init(&import->imp_zombie_chain);
1493 if (!cfs_list_empty(&obd_zombie_exports)) {
1494 export = cfs_list_entry(obd_zombie_exports.next,
1497 cfs_list_del_init(&export->exp_obd_chain);
1500 cfs_spin_unlock(&obd_zombie_impexp_lock);
1503 class_import_destroy(import);
1506 class_export_destroy(export);
1509 } while (import != NULL || export != NULL);
1513 static cfs_completion_t obd_zombie_start;
1514 static cfs_completion_t obd_zombie_stop;
1515 static unsigned long obd_zombie_flags;
1516 static cfs_waitq_t obd_zombie_waitq;
1517 static pid_t obd_zombie_pid;
1520 OBD_ZOMBIE_STOP = 1 << 1
1524 * check for work for kill zombie import/export thread.
1526 static int obd_zombie_impexp_check(void *arg)
1530 cfs_spin_lock(&obd_zombie_impexp_lock);
1531 rc = cfs_list_empty(&obd_zombie_imports) &&
1532 cfs_list_empty(&obd_zombie_exports) &&
1533 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1535 cfs_spin_unlock(&obd_zombie_impexp_lock);
1541 * Add export to the obd_zombe thread and notify it.
1543 static void obd_zombie_export_add(struct obd_export *exp) {
1544 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1545 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1546 cfs_list_del_init(&exp->exp_obd_chain);
1547 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1548 cfs_spin_lock(&obd_zombie_impexp_lock);
1549 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1550 cfs_spin_unlock(&obd_zombie_impexp_lock);
1552 if (obd_zombie_impexp_notify != NULL)
1553 obd_zombie_impexp_notify();
1557 * Add import to the obd_zombe thread and notify it.
1559 static void obd_zombie_import_add(struct obd_import *imp) {
1560 LASSERT(imp->imp_sec == NULL);
1561 cfs_spin_lock(&obd_zombie_impexp_lock);
1562 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1563 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1564 cfs_spin_unlock(&obd_zombie_impexp_lock);
1566 if (obd_zombie_impexp_notify != NULL)
1567 obd_zombie_impexp_notify();
1571 * notify import/export destroy thread about new zombie.
1573 static void obd_zombie_impexp_notify(void)
1575 cfs_waitq_signal(&obd_zombie_waitq);
1579 * check whether obd_zombie is idle
1581 static int obd_zombie_is_idle(void)
1585 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1586 cfs_spin_lock(&obd_zombie_impexp_lock);
1587 rc = cfs_list_empty(&obd_zombie_imports) &&
1588 cfs_list_empty(&obd_zombie_exports);
1589 cfs_spin_unlock(&obd_zombie_impexp_lock);
1594 * wait when obd_zombie import/export queues become empty
1596 void obd_zombie_barrier(void)
1598 struct l_wait_info lwi = { 0 };
1600 if (obd_zombie_pid == cfs_curproc_pid())
1601 /* don't wait for myself */
1603 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1605 EXPORT_SYMBOL(obd_zombie_barrier);
1610 * destroy zombie export/import thread.
1612 static int obd_zombie_impexp_thread(void *unused)
1616 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1617 cfs_complete(&obd_zombie_start);
1621 cfs_complete(&obd_zombie_start);
1623 obd_zombie_pid = cfs_curproc_pid();
1625 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1626 struct l_wait_info lwi = { 0 };
1628 l_wait_event(obd_zombie_waitq,
1629 !obd_zombie_impexp_check(NULL), &lwi);
1630 obd_zombie_impexp_cull();
1633 * Notify obd_zombie_barrier callers that queues
1636 cfs_waitq_signal(&obd_zombie_waitq);
1639 cfs_complete(&obd_zombie_stop);
1644 #else /* ! KERNEL */
1646 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1647 static void *obd_zombie_impexp_work_cb;
1648 static void *obd_zombie_impexp_idle_cb;
1650 int obd_zombie_impexp_kill(void *arg)
1654 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1655 obd_zombie_impexp_cull();
1658 cfs_atomic_dec(&zombie_recur);
1665 * start destroy zombie import/export thread
1667 int obd_zombie_impexp_init(void)
1671 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1672 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1673 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1674 cfs_init_completion(&obd_zombie_start);
1675 cfs_init_completion(&obd_zombie_stop);
1676 cfs_waitq_init(&obd_zombie_waitq);
1680 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1684 cfs_wait_for_completion(&obd_zombie_start);
1687 obd_zombie_impexp_work_cb =
1688 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1689 &obd_zombie_impexp_kill, NULL);
1691 obd_zombie_impexp_idle_cb =
1692 liblustre_register_idle_callback("obd_zombi_impexp_check",
1693 &obd_zombie_impexp_check, NULL);
1699 * stop destroy zombie import/export thread
1701 void obd_zombie_impexp_stop(void)
1703 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1704 obd_zombie_impexp_notify();
1706 cfs_wait_for_completion(&obd_zombie_stop);
1708 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1709 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);