1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 struct list_head obd_zombie_imports;
60 struct list_head obd_zombie_exports;
61 spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65 static void print_export_data(struct obd_export *exp, const char *status);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
70 * support functions: we could use inter-module communication, but this
71 * is more portable to other OS's
73 static struct obd_device *obd_device_alloc(void)
75 struct obd_device *obd;
77 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79 obd->obd_magic = OBD_DEVICE_MAGIC;
83 EXPORT_SYMBOL(obd_device_alloc);
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
101 struct list_head *tmp;
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 list_for_each(tmp, &obd_types) {
106 type = list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
116 struct obd_type *class_get_type(const char *name)
118 struct obd_type *type = class_search_type(name);
122 const char *modname = name;
123 if (!request_module("%s", modname)) {
124 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
125 type = class_search_type(name);
127 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
133 spin_lock(&type->obd_type_lock);
135 try_module_get(type->typ_dt_ops->o_owner);
136 spin_unlock(&type->obd_type_lock);
141 void class_put_type(struct obd_type *type)
144 spin_lock(&type->obd_type_lock);
146 module_put(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
150 #define CLASS_MAX_NAME 1024
152 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
153 struct lprocfs_vars *vars, const char *name,
154 struct lu_device_type *ldt)
156 struct obd_type *type;
161 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
163 if (class_search_type(name)) {
164 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
169 OBD_ALLOC(type, sizeof(*type));
173 OBD_ALLOC_PTR(type->typ_dt_ops);
174 OBD_ALLOC_PTR(type->typ_md_ops);
175 OBD_ALLOC(type->typ_name, strlen(name) + 1);
177 if (type->typ_dt_ops == NULL ||
178 type->typ_md_ops == NULL ||
179 type->typ_name == NULL)
182 *(type->typ_dt_ops) = *dt_ops;
183 /* md_ops is optional */
185 *(type->typ_md_ops) = *md_ops;
186 strcpy(type->typ_name, name);
187 spin_lock_init(&type->obd_type_lock);
190 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
192 if (IS_ERR(type->typ_procroot)) {
193 rc = PTR_ERR(type->typ_procroot);
194 type->typ_procroot = NULL;
200 rc = lu_device_type_init(ldt);
205 spin_lock(&obd_types_lock);
206 list_add(&type->typ_chain, &obd_types);
207 spin_unlock(&obd_types_lock);
212 if (type->typ_name != NULL)
213 OBD_FREE(type->typ_name, strlen(name) + 1);
214 if (type->typ_md_ops != NULL)
215 OBD_FREE_PTR(type->typ_md_ops);
216 if (type->typ_dt_ops != NULL)
217 OBD_FREE_PTR(type->typ_dt_ops);
218 OBD_FREE(type, sizeof(*type));
222 int class_unregister_type(const char *name)
224 struct obd_type *type = class_search_type(name);
228 CERROR("unknown obd type\n");
232 if (type->typ_refcnt) {
233 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
234 /* This is a bad situation, let's make the best of it */
235 /* Remove ops, but leave the name for debugging */
236 OBD_FREE_PTR(type->typ_dt_ops);
237 OBD_FREE_PTR(type->typ_md_ops);
241 if (type->typ_procroot) {
242 lprocfs_remove(&type->typ_procroot);
246 lu_device_type_fini(type->typ_lu);
248 spin_lock(&obd_types_lock);
249 list_del(&type->typ_chain);
250 spin_unlock(&obd_types_lock);
251 OBD_FREE(type->typ_name, strlen(name) + 1);
252 if (type->typ_dt_ops != NULL)
253 OBD_FREE_PTR(type->typ_dt_ops);
254 if (type->typ_md_ops != NULL)
255 OBD_FREE_PTR(type->typ_md_ops);
256 OBD_FREE(type, sizeof(*type));
258 } /* class_unregister_type */
261 * Create a new obd device.
263 * Find an empty slot in ::obd_devs[], create a new obd device in it.
265 * \param[in] type_name obd device type string.
266 * \param[in] name obd device name.
268 * \retval NULL if create fails, otherwise return the obd device
271 struct obd_device *class_newdev(const char *type_name, const char *name)
273 struct obd_device *result = NULL;
274 struct obd_device *newdev;
275 struct obd_type *type = NULL;
277 int new_obd_minor = 0;
279 if (strlen(name) >= MAX_OBD_NAME) {
280 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
281 RETURN(ERR_PTR(-EINVAL));
284 type = class_get_type(type_name);
286 CERROR("OBD: unknown type: %s\n", type_name);
287 RETURN(ERR_PTR(-ENODEV));
290 newdev = obd_device_alloc();
291 if (newdev == NULL) {
292 class_put_type(type);
293 RETURN(ERR_PTR(-ENOMEM));
295 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
297 spin_lock(&obd_dev_lock);
298 for (i = 0; i < class_devno_max(); i++) {
299 struct obd_device *obd = class_num2obd(i);
300 if (obd && obd->obd_name &&
301 (strcmp(name, obd->obd_name) == 0)) {
302 CERROR("Device %s already exists, won't add\n", name);
304 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
305 "%p obd_magic %08x != %08x\n", result,
306 result->obd_magic, OBD_DEVICE_MAGIC);
307 LASSERTF(result->obd_minor == new_obd_minor,
308 "%p obd_minor %d != %d\n", result,
309 result->obd_minor, new_obd_minor);
311 obd_devs[result->obd_minor] = NULL;
312 result->obd_name[0]='\0';
314 result = ERR_PTR(-EEXIST);
317 if (!result && !obd) {
319 result->obd_minor = i;
321 result->obd_type = type;
322 strncpy(result->obd_name, name,
323 sizeof(result->obd_name) - 1);
324 obd_devs[i] = result;
327 spin_unlock(&obd_dev_lock);
329 if (result == NULL && i >= class_devno_max()) {
330 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
332 result = ERR_PTR(-EOVERFLOW);
335 if (IS_ERR(result)) {
336 obd_device_free(newdev);
337 class_put_type(type);
339 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
340 result->obd_name, result);
345 void class_release_dev(struct obd_device *obd)
347 struct obd_type *obd_type = obd->obd_type;
349 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
350 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
351 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
352 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
353 LASSERT(obd_type != NULL);
355 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
356 obd->obd_name,obd->obd_type->typ_name);
358 spin_lock(&obd_dev_lock);
359 obd_devs[obd->obd_minor] = NULL;
360 spin_unlock(&obd_dev_lock);
361 obd_device_free(obd);
363 class_put_type(obd_type);
366 int class_name2dev(const char *name)
373 spin_lock(&obd_dev_lock);
374 for (i = 0; i < class_devno_max(); i++) {
375 struct obd_device *obd = class_num2obd(i);
376 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
377 /* Make sure we finished attaching before we give
378 out any references */
379 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
380 if (obd->obd_attached) {
381 spin_unlock(&obd_dev_lock);
387 spin_unlock(&obd_dev_lock);
392 struct obd_device *class_name2obd(const char *name)
394 int dev = class_name2dev(name);
396 if (dev < 0 || dev > class_devno_max())
398 return class_num2obd(dev);
401 int class_uuid2dev(struct obd_uuid *uuid)
405 spin_lock(&obd_dev_lock);
406 for (i = 0; i < class_devno_max(); i++) {
407 struct obd_device *obd = class_num2obd(i);
408 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
409 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
410 spin_unlock(&obd_dev_lock);
414 spin_unlock(&obd_dev_lock);
419 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
421 int dev = class_uuid2dev(uuid);
424 return class_num2obd(dev);
428 * Get obd device from ::obd_devs[]
430 * \param num [in] array index
432 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
433 * otherwise return the obd device there.
435 struct obd_device *class_num2obd(int num)
437 struct obd_device *obd = NULL;
439 if (num < class_devno_max()) {
444 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
445 "%p obd_magic %08x != %08x\n",
446 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
447 LASSERTF(obd->obd_minor == num,
448 "%p obd_minor %0d != %0d\n",
449 obd, obd->obd_minor, num);
455 void class_obd_list(void)
460 spin_lock(&obd_dev_lock);
461 for (i = 0; i < class_devno_max(); i++) {
462 struct obd_device *obd = class_num2obd(i);
465 if (obd->obd_stopping)
467 else if (obd->obd_set_up)
469 else if (obd->obd_attached)
473 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
474 i, status, obd->obd_type->typ_name,
475 obd->obd_name, obd->obd_uuid.uuid,
476 atomic_read(&obd->obd_refcount));
478 spin_unlock(&obd_dev_lock);
482 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
483 specified, then only the client with that uuid is returned,
484 otherwise any client connected to the tgt is returned. */
485 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
486 const char * typ_name,
487 struct obd_uuid *grp_uuid)
491 spin_lock(&obd_dev_lock);
492 for (i = 0; i < class_devno_max(); i++) {
493 struct obd_device *obd = class_num2obd(i);
496 if ((strncmp(obd->obd_type->typ_name, typ_name,
497 strlen(typ_name)) == 0)) {
498 if (obd_uuid_equals(tgt_uuid,
499 &obd->u.cli.cl_target_uuid) &&
500 ((grp_uuid)? obd_uuid_equals(grp_uuid,
501 &obd->obd_uuid) : 1)) {
502 spin_unlock(&obd_dev_lock);
507 spin_unlock(&obd_dev_lock);
512 /* Iterate the obd_device list looking devices have grp_uuid. Start
513 searching at *next, and if a device is found, the next index to look
514 at is saved in *next. If next is NULL, then the first matching device
515 will always be returned. */
516 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
522 else if (*next >= 0 && *next < class_devno_max())
527 spin_lock(&obd_dev_lock);
528 for (; i < class_devno_max(); i++) {
529 struct obd_device *obd = class_num2obd(i);
532 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
535 spin_unlock(&obd_dev_lock);
539 spin_unlock(&obd_dev_lock);
545 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
546 * adjust sptlrpc settings accordingly.
548 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
550 struct obd_device *obd;
554 LASSERT(namelen > 0);
556 spin_lock(&obd_dev_lock);
557 for (i = 0; i < class_devno_max(); i++) {
558 obd = class_num2obd(i);
560 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
563 /* only notify mdc, osc, mdt, ost */
564 type = obd->obd_type->typ_name;
565 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
566 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
567 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
568 strcmp(type, LUSTRE_OST_NAME) != 0)
571 if (strncmp(obd->obd_name, fsname, namelen))
574 class_incref(obd, __FUNCTION__, obd);
575 spin_unlock(&obd_dev_lock);
576 rc2 = obd_set_info_async(obd->obd_self_export,
577 sizeof(KEY_SPTLRPC_CONF),
578 KEY_SPTLRPC_CONF, 0, NULL, NULL);
580 class_decref(obd, __FUNCTION__, obd);
581 spin_lock(&obd_dev_lock);
583 spin_unlock(&obd_dev_lock);
586 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
588 void obd_cleanup_caches(void)
593 if (obd_device_cachep) {
594 rc = cfs_mem_cache_destroy(obd_device_cachep);
595 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
596 obd_device_cachep = NULL;
599 rc = cfs_mem_cache_destroy(obdo_cachep);
600 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
604 rc = cfs_mem_cache_destroy(import_cachep);
605 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
606 import_cachep = NULL;
609 rc = cfs_mem_cache_destroy(capa_cachep);
610 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
616 int obd_init_caches(void)
620 LASSERT(obd_device_cachep == NULL);
621 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
622 sizeof(struct obd_device),
624 if (!obd_device_cachep)
627 LASSERT(obdo_cachep == NULL);
628 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
633 LASSERT(import_cachep == NULL);
634 import_cachep = cfs_mem_cache_create("ll_import_cache",
635 sizeof(struct obd_import),
640 LASSERT(capa_cachep == NULL);
641 capa_cachep = cfs_mem_cache_create("capa_cache",
642 sizeof(struct obd_capa), 0, 0);
648 obd_cleanup_caches();
653 /* map connection to client */
654 struct obd_export *class_conn2export(struct lustre_handle *conn)
656 struct obd_export *export;
660 CDEBUG(D_CACHE, "looking for null handle\n");
664 if (conn->cookie == -1) { /* this means assign a new connection */
665 CDEBUG(D_CACHE, "want a new connection\n");
669 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
670 export = class_handle2object(conn->cookie);
674 struct obd_device *class_exp2obd(struct obd_export *exp)
681 struct obd_device *class_conn2obd(struct lustre_handle *conn)
683 struct obd_export *export;
684 export = class_conn2export(conn);
686 struct obd_device *obd = export->exp_obd;
687 class_export_put(export);
693 struct obd_import *class_exp2cliimp(struct obd_export *exp)
695 struct obd_device *obd = exp->exp_obd;
698 return obd->u.cli.cl_import;
701 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
703 struct obd_device *obd = class_conn2obd(conn);
706 return obd->u.cli.cl_import;
709 /* Export management functions */
710 static void class_export_destroy(struct obd_export *exp)
712 struct obd_device *obd = exp->exp_obd;
715 LASSERT (atomic_read(&exp->exp_refcount) == 0);
717 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
718 exp->exp_client_uuid.uuid, obd->obd_name);
720 LASSERT(obd != NULL);
722 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
723 if (exp->exp_connection)
724 ptlrpc_put_connection_superhack(exp->exp_connection);
726 LASSERT(list_empty(&exp->exp_outstanding_replies));
727 LASSERT(list_empty(&exp->exp_uncommitted_replies));
728 LASSERT(list_empty(&exp->exp_req_replay_queue));
729 LASSERT(list_empty(&exp->exp_queued_rpc));
730 obd_destroy_export(exp);
731 class_decref(obd, "export", exp);
733 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
737 static void export_handle_addref(void *export)
739 class_export_get(export);
742 struct obd_export *class_export_get(struct obd_export *exp)
744 atomic_inc(&exp->exp_refcount);
745 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
746 atomic_read(&exp->exp_refcount));
749 EXPORT_SYMBOL(class_export_get);
751 void class_export_put(struct obd_export *exp)
753 LASSERT(exp != NULL);
754 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
755 atomic_read(&exp->exp_refcount) - 1);
756 LASSERT(atomic_read(&exp->exp_refcount) > 0);
757 LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
759 if (atomic_dec_and_test(&exp->exp_refcount)) {
760 LASSERT(!list_empty(&exp->exp_obd_chain));
761 CDEBUG(D_IOCTL, "final put %p/%s\n",
762 exp, exp->exp_client_uuid.uuid);
763 obd_zombie_export_add(exp);
766 EXPORT_SYMBOL(class_export_put);
768 /* Creates a new export, adds it to the hash table, and returns a
769 * pointer to it. The refcount is 2: one for the hash reference, and
770 * one for the pointer returned by this function. */
771 struct obd_export *class_new_export(struct obd_device *obd,
772 struct obd_uuid *cluuid)
774 struct obd_export *export;
778 OBD_ALLOC_PTR(export);
780 return ERR_PTR(-ENOMEM);
782 export->exp_conn_cnt = 0;
783 export->exp_lock_hash = NULL;
784 atomic_set(&export->exp_refcount, 2);
785 atomic_set(&export->exp_rpc_count, 0);
786 atomic_set(&export->exp_cb_count, 0);
787 atomic_set(&export->exp_locks_count, 0);
788 atomic_set(&export->exp_replay_count, 0);
789 export->exp_obd = obd;
790 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
791 spin_lock_init(&export->exp_uncommitted_replies_lock);
792 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
793 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
794 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
795 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
796 class_handle_hash(&export->exp_handle, export_handle_addref);
797 export->exp_last_request_time = cfs_time_current_sec();
798 spin_lock_init(&export->exp_lock);
799 INIT_HLIST_NODE(&export->exp_uuid_hash);
800 INIT_HLIST_NODE(&export->exp_nid_hash);
802 export->exp_sp_peer = LUSTRE_SP_ANY;
803 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
804 export->exp_client_uuid = *cluuid;
805 obd_init_export(export);
807 spin_lock(&obd->obd_dev_lock);
808 /* shouldn't happen, but might race */
809 if (obd->obd_stopping)
810 GOTO(exit_err, rc = -ENODEV);
812 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
813 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
814 &export->exp_uuid_hash);
816 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
817 obd->obd_name, cluuid->uuid, rc);
818 GOTO(exit_err, rc = -EALREADY);
822 class_incref(obd, "export", export);
823 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
824 list_add_tail(&export->exp_obd_chain_timed,
825 &export->exp_obd->obd_exports_timed);
826 export->exp_obd->obd_num_exports++;
827 spin_unlock(&obd->obd_dev_lock);
831 spin_unlock(&obd->obd_dev_lock);
832 class_handle_unhash(&export->exp_handle);
833 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
834 obd_destroy_export(export);
835 OBD_FREE_PTR(export);
838 EXPORT_SYMBOL(class_new_export);
840 void class_unlink_export(struct obd_export *exp)
842 class_handle_unhash(&exp->exp_handle);
844 spin_lock(&exp->exp_obd->obd_dev_lock);
845 /* delete an uuid-export hashitem from hashtables */
846 if (!hlist_unhashed(&exp->exp_uuid_hash))
847 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
848 &exp->exp_client_uuid,
849 &exp->exp_uuid_hash);
851 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
852 list_del_init(&exp->exp_obd_chain_timed);
853 exp->exp_obd->obd_num_exports--;
854 spin_unlock(&exp->exp_obd->obd_dev_lock);
855 class_export_put(exp);
857 EXPORT_SYMBOL(class_unlink_export);
859 /* Import management functions */
860 void class_import_destroy(struct obd_import *imp)
864 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
865 imp->imp_obd->obd_name);
867 LASSERT(atomic_read(&imp->imp_refcount) == 0);
869 ptlrpc_put_connection_superhack(imp->imp_connection);
871 while (!list_empty(&imp->imp_conn_list)) {
872 struct obd_import_conn *imp_conn;
874 imp_conn = list_entry(imp->imp_conn_list.next,
875 struct obd_import_conn, oic_item);
876 list_del_init(&imp_conn->oic_item);
877 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
878 OBD_FREE(imp_conn, sizeof(*imp_conn));
881 LASSERT(imp->imp_sec == NULL);
882 class_decref(imp->imp_obd, "import", imp);
883 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
887 static void import_handle_addref(void *import)
889 class_import_get(import);
892 struct obd_import *class_import_get(struct obd_import *import)
894 LASSERT(atomic_read(&import->imp_refcount) >= 0);
895 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
896 atomic_inc(&import->imp_refcount);
897 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
898 atomic_read(&import->imp_refcount),
899 import->imp_obd->obd_name);
902 EXPORT_SYMBOL(class_import_get);
904 void class_import_put(struct obd_import *imp)
908 LASSERT(atomic_read(&imp->imp_refcount) > 0);
909 LASSERT(atomic_read(&imp->imp_refcount) < 0x5a5a5a);
910 LASSERT(list_empty(&imp->imp_zombie_chain));
912 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
913 atomic_read(&imp->imp_refcount) - 1,
914 imp->imp_obd->obd_name);
916 if (atomic_dec_and_test(&imp->imp_refcount)) {
917 CDEBUG(D_INFO, "final put import %p\n", imp);
918 obd_zombie_import_add(imp);
923 EXPORT_SYMBOL(class_import_put);
925 static void init_imp_at(struct imp_at *at) {
927 at_init(&at->iat_net_latency, 0, 0);
928 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
929 /* max service estimates are tracked on the server side, so
930 don't use the AT history here, just use the last reported
931 val. (But keep hist for proc histogram, worst_ever) */
932 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
937 struct obd_import *class_new_import(struct obd_device *obd)
939 struct obd_import *imp;
941 OBD_ALLOC(imp, sizeof(*imp));
945 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
946 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
947 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
948 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
949 spin_lock_init(&imp->imp_lock);
950 imp->imp_last_success_conn = 0;
951 imp->imp_state = LUSTRE_IMP_NEW;
952 imp->imp_obd = class_incref(obd, "import", imp);
953 sema_init(&imp->imp_sec_mutex, 1);
954 cfs_waitq_init(&imp->imp_recovery_waitq);
956 atomic_set(&imp->imp_refcount, 2);
957 atomic_set(&imp->imp_unregistering, 0);
958 atomic_set(&imp->imp_inflight, 0);
959 atomic_set(&imp->imp_replay_inflight, 0);
960 atomic_set(&imp->imp_inval_count, 0);
961 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
962 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
963 class_handle_hash(&imp->imp_handle, import_handle_addref);
964 init_imp_at(&imp->imp_at);
966 /* the default magic is V2, will be used in connect RPC, and
967 * then adjusted according to the flags in request/reply. */
968 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
972 EXPORT_SYMBOL(class_new_import);
974 void class_destroy_import(struct obd_import *import)
976 LASSERT(import != NULL);
977 LASSERT(import != LP_POISON);
979 class_handle_unhash(&import->imp_handle);
981 spin_lock(&import->imp_lock);
982 import->imp_generation++;
983 spin_unlock(&import->imp_lock);
984 class_import_put(import);
986 EXPORT_SYMBOL(class_destroy_import);
988 /* A connection defines an export context in which preallocation can
989 be managed. This releases the export pointer reference, and returns
990 the export handle, so the export refcount is 1 when this function
992 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
993 struct obd_uuid *cluuid)
995 struct obd_export *export;
996 LASSERT(conn != NULL);
997 LASSERT(obd != NULL);
998 LASSERT(cluuid != NULL);
1001 export = class_new_export(obd, cluuid);
1003 RETURN(PTR_ERR(export));
1005 conn->cookie = export->exp_handle.h_cookie;
1006 class_export_put(export);
1008 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1009 cluuid->uuid, conn->cookie);
1012 EXPORT_SYMBOL(class_connect);
1014 /* if export is involved in recovery then clean up related things */
1015 void class_export_recovery_cleanup(struct obd_export *exp)
1017 struct obd_device *obd = exp->exp_obd;
1019 spin_lock_bh(&obd->obd_processing_task_lock);
1020 if (exp->exp_delayed)
1021 obd->obd_delayed_clients--;
1022 if (obd->obd_recovering && exp->exp_in_recovery) {
1023 spin_lock(&exp->exp_lock);
1024 exp->exp_in_recovery = 0;
1025 spin_unlock(&exp->exp_lock);
1026 LASSERT(obd->obd_connected_clients);
1027 obd->obd_connected_clients--;
1029 /** Cleanup req replay fields */
1030 if (exp->exp_req_replay_needed) {
1031 spin_lock(&exp->exp_lock);
1032 exp->exp_req_replay_needed = 0;
1033 spin_unlock(&exp->exp_lock);
1034 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1035 atomic_dec(&obd->obd_req_replay_clients);
1037 /** Cleanup lock replay data */
1038 if (exp->exp_lock_replay_needed) {
1039 spin_lock(&exp->exp_lock);
1040 exp->exp_lock_replay_needed = 0;
1041 spin_unlock(&exp->exp_lock);
1042 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1043 atomic_dec(&obd->obd_lock_replay_clients);
1045 spin_unlock_bh(&obd->obd_processing_task_lock);
1048 /* This function removes 1-3 references from the export:
1049 * 1 - for export pointer passed
1050 * and if disconnect really need
1051 * 2 - removing from hash
1052 * 3 - in client_unlink_export
1053 * The export pointer passed to this function can destroyed */
1054 int class_disconnect(struct obd_export *export)
1056 int already_disconnected;
1059 if (export == NULL) {
1061 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1065 spin_lock(&export->exp_lock);
1066 already_disconnected = export->exp_disconnected;
1067 export->exp_disconnected = 1;
1068 spin_unlock(&export->exp_lock);
1070 /* class_cleanup(), abort_recovery(), and class_fail_export()
1071 * all end up in here, and if any of them race we shouldn't
1072 * call extra class_export_puts(). */
1073 if (already_disconnected) {
1074 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1075 GOTO(no_disconn, already_disconnected);
1078 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1079 export->exp_handle.h_cookie);
1081 if (!hlist_unhashed(&export->exp_nid_hash))
1082 lustre_hash_del(export->exp_obd->obd_nid_hash,
1083 &export->exp_connection->c_peer.nid,
1084 &export->exp_nid_hash);
1086 class_export_recovery_cleanup(export);
1087 class_unlink_export(export);
1089 class_export_put(export);
1093 static void class_disconnect_export_list(struct list_head *list,
1094 enum obd_option flags)
1097 struct obd_export *exp;
1100 /* It's possible that an export may disconnect itself, but
1101 * nothing else will be added to this list. */
1102 while (!list_empty(list)) {
1103 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1104 /* need for safe call CDEBUG after obd_disconnect */
1105 class_export_get(exp);
1107 spin_lock(&exp->exp_lock);
1108 exp->exp_flags = flags;
1109 spin_unlock(&exp->exp_lock);
1111 if (obd_uuid_equals(&exp->exp_client_uuid,
1112 &exp->exp_obd->obd_uuid)) {
1114 "exp %p export uuid == obd uuid, don't discon\n",
1116 /* Need to delete this now so we don't end up pointing
1117 * to work_list later when this export is cleaned up. */
1118 list_del_init(&exp->exp_obd_chain);
1119 class_export_put(exp);
1123 class_export_get(exp);
1124 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1125 "last request at "CFS_TIME_T"\n",
1126 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1127 exp, exp->exp_last_request_time);
1128 /* release one export reference anyway */
1129 rc = obd_disconnect(exp);
1131 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1132 obd_export_nid2str(exp), exp, rc);
1133 class_export_put(exp);
1138 void class_disconnect_exports(struct obd_device *obd)
1140 struct list_head work_list;
1143 /* Move all of the exports from obd_exports to a work list, en masse. */
1144 CFS_INIT_LIST_HEAD(&work_list);
1145 spin_lock(&obd->obd_dev_lock);
1146 list_splice_init(&obd->obd_exports, &work_list);
1147 list_splice_init(&obd->obd_delayed_exports, &work_list);
1148 spin_unlock(&obd->obd_dev_lock);
1150 if (!list_empty(&work_list)) {
1151 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1152 "disconnecting them\n", obd->obd_minor, obd);
1153 class_disconnect_export_list(&work_list,
1154 exp_flags_from_obd(obd));
1156 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1157 obd->obd_minor, obd);
1160 EXPORT_SYMBOL(class_disconnect_exports);
1162 /* Remove exports that have not completed recovery.
1164 void class_disconnect_stale_exports(struct obd_device *obd,
1165 int (*test_export)(struct obd_export *))
1167 struct list_head work_list;
1168 struct list_head *pos, *n;
1169 struct obd_export *exp;
1173 CFS_INIT_LIST_HEAD(&work_list);
1174 spin_lock(&obd->obd_dev_lock);
1175 list_for_each_safe(pos, n, &obd->obd_exports) {
1176 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1177 if (test_export(exp))
1180 /* don't count self-export as client */
1181 if (obd_uuid_equals(&exp->exp_client_uuid,
1182 &exp->exp_obd->obd_uuid))
1185 list_move(&exp->exp_obd_chain, &work_list);
1187 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1188 obd->obd_name, exp->exp_client_uuid.uuid,
1189 exp->exp_connection == NULL ? "<unknown>" :
1190 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1191 print_export_data(exp, "EVICTING");
1193 spin_unlock(&obd->obd_dev_lock);
1196 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1197 obd->obd_name, evicted);
1198 obd->obd_stale_clients += evicted;
1200 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1201 OBD_OPT_ABORT_RECOV);
1204 EXPORT_SYMBOL(class_disconnect_stale_exports);
1206 void class_fail_export(struct obd_export *exp)
1208 int rc, already_failed;
1210 spin_lock(&exp->exp_lock);
1211 already_failed = exp->exp_failed;
1212 exp->exp_failed = 1;
1213 spin_unlock(&exp->exp_lock);
1215 if (already_failed) {
1216 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1217 exp, exp->exp_client_uuid.uuid);
1221 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1222 exp, exp->exp_client_uuid.uuid);
1224 if (obd_dump_on_timeout)
1225 libcfs_debug_dumplog();
1227 /* Most callers into obd_disconnect are removing their own reference
1228 * (request, for example) in addition to the one from the hash table.
1229 * We don't have such a reference here, so make one. */
1230 class_export_get(exp);
1231 rc = obd_disconnect(exp);
1233 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1235 CDEBUG(D_HA, "disconnected export %p/%s\n",
1236 exp, exp->exp_client_uuid.uuid);
1238 EXPORT_SYMBOL(class_fail_export);
1240 char *obd_export_nid2str(struct obd_export *exp)
1242 if (exp->exp_connection != NULL)
1243 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1247 EXPORT_SYMBOL(obd_export_nid2str);
1249 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1251 struct obd_export *doomed_exp = NULL;
1252 int exports_evicted = 0;
1254 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1257 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1258 if (doomed_exp == NULL)
1261 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1262 "nid %s found, wanted nid %s, requested nid %s\n",
1263 obd_export_nid2str(doomed_exp),
1264 libcfs_nid2str(nid_key), nid);
1265 LASSERTF(doomed_exp != obd->obd_self_export,
1266 "self-export is hashed by NID?\n");
1268 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1269 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1271 class_fail_export(doomed_exp);
1272 class_export_put(doomed_exp);
1275 if (!exports_evicted)
1276 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1277 obd->obd_name, nid);
1278 return exports_evicted;
1280 EXPORT_SYMBOL(obd_export_evict_by_nid);
1282 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1284 struct obd_export *doomed_exp = NULL;
1285 struct obd_uuid doomed_uuid;
1286 int exports_evicted = 0;
1288 obd_str2uuid(&doomed_uuid, uuid);
1289 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1290 CERROR("%s: can't evict myself\n", obd->obd_name);
1291 return exports_evicted;
1294 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1296 if (doomed_exp == NULL) {
1297 CERROR("%s: can't disconnect %s: no exports found\n",
1298 obd->obd_name, uuid);
1300 CWARN("%s: evicting %s at adminstrative request\n",
1301 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1302 class_fail_export(doomed_exp);
1303 class_export_put(doomed_exp);
1307 return exports_evicted;
1309 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1311 static void print_export_data(struct obd_export *exp, const char *status)
1313 struct ptlrpc_reply_state *rs;
1314 struct ptlrpc_reply_state *first_reply = NULL;
1317 spin_lock(&exp->exp_lock);
1318 list_for_each_entry (rs, &exp->exp_outstanding_replies, rs_exp_list) {
1323 spin_unlock(&exp->exp_lock);
1325 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1326 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1327 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1328 atomic_read(&exp->exp_rpc_count),
1329 atomic_read(&exp->exp_cb_count),
1330 atomic_read(&exp->exp_locks_count),
1331 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1332 nreplies, first_reply, nreplies > 3 ? "..." : "",
1333 exp->exp_last_committed);
1336 void dump_exports(struct obd_device *obd)
1338 struct obd_export *exp;
1340 spin_lock(&obd->obd_dev_lock);
1341 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1342 print_export_data(exp, "ACTIVE");
1343 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1344 print_export_data(exp, "UNLINKED");
1345 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1346 print_export_data(exp, "DELAYED");
1347 spin_unlock(&obd->obd_dev_lock);
1348 spin_lock(&obd_zombie_impexp_lock);
1349 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1350 print_export_data(exp, "ZOMBIE");
1351 spin_unlock(&obd_zombie_impexp_lock);
1353 EXPORT_SYMBOL(dump_exports);
1355 void obd_exports_barrier(struct obd_device *obd)
1358 LASSERT(list_empty(&obd->obd_exports));
1359 spin_lock(&obd->obd_dev_lock);
1360 while (!list_empty(&obd->obd_unlinked_exports)) {
1361 spin_unlock(&obd->obd_dev_lock);
1362 cfs_schedule_timeout(CFS_TASK_UNINT, cfs_time_seconds(waited));
1363 if (waited > 5 && IS_PO2(waited)) {
1364 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1365 "more than %d seconds. "
1366 "The obd refcount = %d. Is it stuck?\n",
1367 obd->obd_name, waited,
1368 atomic_read(&obd->obd_refcount));
1372 spin_lock(&obd->obd_dev_lock);
1374 spin_unlock(&obd->obd_dev_lock);
1376 EXPORT_SYMBOL(obd_exports_barrier);
1379 * kill zombie imports and exports
1381 void obd_zombie_impexp_cull(void)
1383 struct obd_import *import;
1384 struct obd_export *export;
1388 spin_lock(&obd_zombie_impexp_lock);
1391 if (!list_empty(&obd_zombie_imports)) {
1392 import = list_entry(obd_zombie_imports.next,
1395 list_del_init(&import->imp_zombie_chain);
1399 if (!list_empty(&obd_zombie_exports)) {
1400 export = list_entry(obd_zombie_exports.next,
1403 list_del_init(&export->exp_obd_chain);
1406 spin_unlock(&obd_zombie_impexp_lock);
1409 class_import_destroy(import);
1412 class_export_destroy(export);
1414 } while (import != NULL || export != NULL);
1418 static struct completion obd_zombie_start;
1419 static struct completion obd_zombie_stop;
1420 static unsigned long obd_zombie_flags;
1421 static cfs_waitq_t obd_zombie_waitq;
1422 static pid_t obd_zombie_pid;
1425 OBD_ZOMBIE_STOP = 1 << 1
1429 * check for work for kill zombie import/export thread.
1431 static int obd_zombie_impexp_check(void *arg)
1435 spin_lock(&obd_zombie_impexp_lock);
1436 rc = list_empty(&obd_zombie_imports) &&
1437 list_empty(&obd_zombie_exports) &&
1438 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1440 spin_unlock(&obd_zombie_impexp_lock);
1446 * Add export to the obd_zombe thread and notify it.
1448 static void obd_zombie_export_add(struct obd_export *exp) {
1449 spin_lock(&exp->exp_obd->obd_dev_lock);
1450 LASSERT(!list_empty(&exp->exp_obd_chain));
1451 list_del_init(&exp->exp_obd_chain);
1452 spin_unlock(&exp->exp_obd->obd_dev_lock);
1453 spin_lock(&obd_zombie_impexp_lock);
1454 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1455 spin_unlock(&obd_zombie_impexp_lock);
1457 if (obd_zombie_impexp_notify != NULL)
1458 obd_zombie_impexp_notify();
1462 * Add import to the obd_zombe thread and notify it.
1464 static void obd_zombie_import_add(struct obd_import *imp) {
1465 LASSERT(imp->imp_sec == NULL);
1466 spin_lock(&obd_zombie_impexp_lock);
1467 LASSERT(list_empty(&imp->imp_zombie_chain));
1468 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1469 spin_unlock(&obd_zombie_impexp_lock);
1471 if (obd_zombie_impexp_notify != NULL)
1472 obd_zombie_impexp_notify();
1476 * notify import/export destroy thread about new zombie.
1478 static void obd_zombie_impexp_notify(void)
1480 cfs_waitq_signal(&obd_zombie_waitq);
1484 * check whether obd_zombie is idle
1486 static int obd_zombie_is_idle(void)
1490 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1491 spin_lock(&obd_zombie_impexp_lock);
1492 rc = list_empty(&obd_zombie_imports) &&
1493 list_empty(&obd_zombie_exports);
1494 spin_unlock(&obd_zombie_impexp_lock);
1499 * wait when obd_zombie import/export queues become empty
1501 void obd_zombie_barrier(void)
1503 struct l_wait_info lwi = { 0 };
1505 if (obd_zombie_pid == cfs_curproc_pid())
1506 /* don't wait for myself */
1508 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1510 EXPORT_SYMBOL(obd_zombie_barrier);
1515 * destroy zombie export/import thread.
1517 static int obd_zombie_impexp_thread(void *unused)
1521 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1522 complete(&obd_zombie_start);
1526 complete(&obd_zombie_start);
1528 obd_zombie_pid = cfs_curproc_pid();
1530 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1531 struct l_wait_info lwi = { 0 };
1533 l_wait_event(obd_zombie_waitq,
1534 !obd_zombie_impexp_check(NULL), &lwi);
1535 obd_zombie_impexp_cull();
1538 * Notify obd_zombie_barrier callers that queues
1541 cfs_waitq_signal(&obd_zombie_waitq);
1544 complete(&obd_zombie_stop);
1549 #else /* ! KERNEL */
1551 static atomic_t zombie_recur = ATOMIC_INIT(0);
1552 static void *obd_zombie_impexp_work_cb;
1553 static void *obd_zombie_impexp_idle_cb;
1555 int obd_zombie_impexp_kill(void *arg)
1559 if (atomic_inc_return(&zombie_recur) == 1) {
1560 obd_zombie_impexp_cull();
1563 atomic_dec(&zombie_recur);
1570 * start destroy zombie import/export thread
1572 int obd_zombie_impexp_init(void)
1576 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1577 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1578 spin_lock_init(&obd_zombie_impexp_lock);
1579 init_completion(&obd_zombie_start);
1580 init_completion(&obd_zombie_stop);
1581 cfs_waitq_init(&obd_zombie_waitq);
1585 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1589 wait_for_completion(&obd_zombie_start);
1592 obd_zombie_impexp_work_cb =
1593 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1594 &obd_zombie_impexp_kill, NULL);
1596 obd_zombie_impexp_idle_cb =
1597 liblustre_register_idle_callback("obd_zombi_impexp_check",
1598 &obd_zombie_impexp_check, NULL);
1604 * stop destroy zombie import/export thread
1606 void obd_zombie_impexp_stop(void)
1608 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1609 obd_zombie_impexp_notify();
1611 wait_for_completion(&obd_zombie_stop);
1613 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1614 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);