1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 struct list_head obd_zombie_imports;
60 struct list_head obd_zombie_exports;
61 spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
69 * support functions: we could use inter-module communication, but this
70 * is more portable to other OS's
72 static struct obd_device *obd_device_alloc(void)
74 struct obd_device *obd;
76 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
78 obd->obd_magic = OBD_DEVICE_MAGIC;
82 EXPORT_SYMBOL(obd_device_alloc);
84 static void obd_device_free(struct obd_device *obd)
87 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89 if (obd->obd_namespace != NULL) {
90 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91 obd, obd->obd_namespace, obd->obd_force);
94 lu_ref_fini(&obd->obd_reference);
95 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 struct obd_type *class_search_type(const char *name)
100 struct list_head *tmp;
101 struct obd_type *type;
103 spin_lock(&obd_types_lock);
104 list_for_each(tmp, &obd_types) {
105 type = list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 spin_unlock(&obd_types_lock);
111 spin_unlock(&obd_types_lock);
115 struct obd_type *class_get_type(const char *name)
117 struct obd_type *type = class_search_type(name);
121 const char *modname = name;
122 if (!request_module(modname)) {
123 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124 type = class_search_type(name);
126 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
132 spin_lock(&type->obd_type_lock);
134 try_module_get(type->typ_dt_ops->o_owner);
135 spin_unlock(&type->obd_type_lock);
140 void class_put_type(struct obd_type *type)
143 spin_lock(&type->obd_type_lock);
145 module_put(type->typ_dt_ops->o_owner);
146 spin_unlock(&type->obd_type_lock);
149 #define CLASS_MAX_NAME 1024
151 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
152 struct lprocfs_vars *vars, const char *name,
153 struct lu_device_type *ldt)
155 struct obd_type *type;
160 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
162 if (class_search_type(name)) {
163 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
168 OBD_ALLOC(type, sizeof(*type));
172 OBD_ALLOC_PTR(type->typ_dt_ops);
173 OBD_ALLOC_PTR(type->typ_md_ops);
174 OBD_ALLOC(type->typ_name, strlen(name) + 1);
176 if (type->typ_dt_ops == NULL ||
177 type->typ_md_ops == NULL ||
178 type->typ_name == NULL)
181 *(type->typ_dt_ops) = *dt_ops;
182 /* md_ops is optional */
184 *(type->typ_md_ops) = *md_ops;
185 strcpy(type->typ_name, name);
186 spin_lock_init(&type->obd_type_lock);
189 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
191 if (IS_ERR(type->typ_procroot)) {
192 rc = PTR_ERR(type->typ_procroot);
193 type->typ_procroot = NULL;
199 rc = lu_device_type_init(ldt);
204 spin_lock(&obd_types_lock);
205 list_add(&type->typ_chain, &obd_types);
206 spin_unlock(&obd_types_lock);
211 if (type->typ_name != NULL)
212 OBD_FREE(type->typ_name, strlen(name) + 1);
213 if (type->typ_md_ops != NULL)
214 OBD_FREE_PTR(type->typ_md_ops);
215 if (type->typ_dt_ops != NULL)
216 OBD_FREE_PTR(type->typ_dt_ops);
217 OBD_FREE(type, sizeof(*type));
221 int class_unregister_type(const char *name)
223 struct obd_type *type = class_search_type(name);
227 CERROR("unknown obd type\n");
231 if (type->typ_refcnt) {
232 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
233 /* This is a bad situation, let's make the best of it */
234 /* Remove ops, but leave the name for debugging */
235 OBD_FREE_PTR(type->typ_dt_ops);
236 OBD_FREE_PTR(type->typ_md_ops);
240 if (type->typ_procroot) {
241 lprocfs_remove(&type->typ_procroot);
245 lu_device_type_fini(type->typ_lu);
247 spin_lock(&obd_types_lock);
248 list_del(&type->typ_chain);
249 spin_unlock(&obd_types_lock);
250 OBD_FREE(type->typ_name, strlen(name) + 1);
251 if (type->typ_dt_ops != NULL)
252 OBD_FREE_PTR(type->typ_dt_ops);
253 if (type->typ_md_ops != NULL)
254 OBD_FREE_PTR(type->typ_md_ops);
255 OBD_FREE(type, sizeof(*type));
257 } /* class_unregister_type */
260 * Create a new obd device.
262 * Find an empty slot in ::obd_devs[], create a new obd device in it.
264 * \param typename [in] obd device type string.
265 * \param name [in] obd device name.
267 * \retval NULL if create fails, otherwise return the obd device
270 struct obd_device *class_newdev(const char *type_name, const char *name)
272 struct obd_device *result = NULL;
273 struct obd_device *newdev;
274 struct obd_type *type = NULL;
276 int new_obd_minor = 0;
278 if (strlen(name) >= MAX_OBD_NAME) {
279 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
280 RETURN(ERR_PTR(-EINVAL));
283 type = class_get_type(type_name);
285 CERROR("OBD: unknown type: %s\n", type_name);
286 RETURN(ERR_PTR(-ENODEV));
289 newdev = obd_device_alloc();
290 if (newdev == NULL) {
291 class_put_type(type);
292 RETURN(ERR_PTR(-ENOMEM));
294 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
296 spin_lock(&obd_dev_lock);
297 for (i = 0; i < class_devno_max(); i++) {
298 struct obd_device *obd = class_num2obd(i);
299 if (obd && obd->obd_name &&
300 (strcmp(name, obd->obd_name) == 0)) {
301 CERROR("Device %s already exists, won't add\n", name);
303 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
304 "%p obd_magic %08x != %08x\n", result,
305 result->obd_magic, OBD_DEVICE_MAGIC);
306 LASSERTF(result->obd_minor == new_obd_minor,
307 "%p obd_minor %d != %d\n", result,
308 result->obd_minor, new_obd_minor);
310 obd_devs[result->obd_minor] = NULL;
311 result->obd_name[0]='\0';
313 result = ERR_PTR(-EEXIST);
316 if (!result && !obd) {
318 result->obd_minor = i;
320 result->obd_type = type;
321 strncpy(result->obd_name, name,
322 sizeof(result->obd_name) - 1);
323 obd_devs[i] = result;
326 spin_unlock(&obd_dev_lock);
328 if (result == NULL && i >= class_devno_max()) {
329 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
331 result = ERR_PTR(-EOVERFLOW);
334 if (IS_ERR(result)) {
335 obd_device_free(newdev);
336 class_put_type(type);
338 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
339 result->obd_name, result);
344 void class_release_dev(struct obd_device *obd)
346 struct obd_type *obd_type = obd->obd_type;
348 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
349 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
350 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
351 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
352 LASSERT(obd_type != NULL);
354 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
355 obd->obd_name,obd->obd_type->typ_name);
357 spin_lock(&obd_dev_lock);
358 obd_devs[obd->obd_minor] = NULL;
359 spin_unlock(&obd_dev_lock);
360 obd_device_free(obd);
362 class_put_type(obd_type);
365 int class_name2dev(const char *name)
372 spin_lock(&obd_dev_lock);
373 for (i = 0; i < class_devno_max(); i++) {
374 struct obd_device *obd = class_num2obd(i);
375 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
376 /* Make sure we finished attaching before we give
377 out any references */
378 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
379 if (obd->obd_attached) {
380 spin_unlock(&obd_dev_lock);
386 spin_unlock(&obd_dev_lock);
391 struct obd_device *class_name2obd(const char *name)
393 int dev = class_name2dev(name);
395 if (dev < 0 || dev > class_devno_max())
397 return class_num2obd(dev);
400 int class_uuid2dev(struct obd_uuid *uuid)
404 spin_lock(&obd_dev_lock);
405 for (i = 0; i < class_devno_max(); i++) {
406 struct obd_device *obd = class_num2obd(i);
407 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
408 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
409 spin_unlock(&obd_dev_lock);
413 spin_unlock(&obd_dev_lock);
418 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
420 int dev = class_uuid2dev(uuid);
423 return class_num2obd(dev);
427 * Get obd device from ::obd_devs[]
429 * \param num [in] array index
431 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
432 * otherwise return the obd device there.
434 struct obd_device *class_num2obd(int num)
436 struct obd_device *obd = NULL;
438 if (num < class_devno_max()) {
443 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
444 "%p obd_magic %08x != %08x\n",
445 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
446 LASSERTF(obd->obd_minor == num,
447 "%p obd_minor %0d != %0d\n",
448 obd, obd->obd_minor, num);
454 void class_obd_list(void)
459 spin_lock(&obd_dev_lock);
460 for (i = 0; i < class_devno_max(); i++) {
461 struct obd_device *obd = class_num2obd(i);
464 if (obd->obd_stopping)
466 else if (obd->obd_set_up)
468 else if (obd->obd_attached)
472 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
473 i, status, obd->obd_type->typ_name,
474 obd->obd_name, obd->obd_uuid.uuid,
475 atomic_read(&obd->obd_refcount));
477 spin_unlock(&obd_dev_lock);
481 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
482 specified, then only the client with that uuid is returned,
483 otherwise any client connected to the tgt is returned. */
484 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
485 const char * typ_name,
486 struct obd_uuid *grp_uuid)
490 spin_lock(&obd_dev_lock);
491 for (i = 0; i < class_devno_max(); i++) {
492 struct obd_device *obd = class_num2obd(i);
495 if ((strncmp(obd->obd_type->typ_name, typ_name,
496 strlen(typ_name)) == 0)) {
497 if (obd_uuid_equals(tgt_uuid,
498 &obd->u.cli.cl_target_uuid) &&
499 ((grp_uuid)? obd_uuid_equals(grp_uuid,
500 &obd->obd_uuid) : 1)) {
501 spin_unlock(&obd_dev_lock);
506 spin_unlock(&obd_dev_lock);
511 /* Iterate the obd_device list looking devices have grp_uuid. Start
512 searching at *next, and if a device is found, the next index to look
513 at is saved in *next. If next is NULL, then the first matching device
514 will always be returned. */
515 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
521 else if (*next >= 0 && *next < class_devno_max())
526 spin_lock(&obd_dev_lock);
527 for (; i < class_devno_max(); i++) {
528 struct obd_device *obd = class_num2obd(i);
531 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
534 spin_unlock(&obd_dev_lock);
538 spin_unlock(&obd_dev_lock);
544 * to notify sptlrpc log for @fsname has changed, let every relevant OBD
545 * adjust sptlrpc settings accordingly.
547 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
549 struct obd_device *obd;
553 LASSERT(namelen > 0);
555 spin_lock(&obd_dev_lock);
556 for (i = 0; i < class_devno_max(); i++) {
557 obd = class_num2obd(i);
559 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
562 /* only notify mdc, osc, mdt, ost */
563 type = obd->obd_type->typ_name;
564 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
565 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
566 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
567 strcmp(type, LUSTRE_OST_NAME) != 0)
570 if (strncmp(obd->obd_name, fsname, namelen))
573 class_incref(obd, __FUNCTION__, obd);
574 spin_unlock(&obd_dev_lock);
575 rc2 = obd_set_info_async(obd->obd_self_export,
576 sizeof(KEY_SPTLRPC_CONF),
577 KEY_SPTLRPC_CONF, 0, NULL, NULL);
579 class_decref(obd, __FUNCTION__, obd);
580 spin_lock(&obd_dev_lock);
582 spin_unlock(&obd_dev_lock);
585 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
587 void obd_cleanup_caches(void)
592 if (obd_device_cachep) {
593 rc = cfs_mem_cache_destroy(obd_device_cachep);
594 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
595 obd_device_cachep = NULL;
598 rc = cfs_mem_cache_destroy(obdo_cachep);
599 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
603 rc = cfs_mem_cache_destroy(import_cachep);
604 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
605 import_cachep = NULL;
608 rc = cfs_mem_cache_destroy(capa_cachep);
609 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
615 int obd_init_caches(void)
619 LASSERT(obd_device_cachep == NULL);
620 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
621 sizeof(struct obd_device),
623 if (!obd_device_cachep)
626 LASSERT(obdo_cachep == NULL);
627 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
632 LASSERT(import_cachep == NULL);
633 import_cachep = cfs_mem_cache_create("ll_import_cache",
634 sizeof(struct obd_import),
639 LASSERT(capa_cachep == NULL);
640 capa_cachep = cfs_mem_cache_create("capa_cache",
641 sizeof(struct obd_capa), 0, 0);
647 obd_cleanup_caches();
652 /* map connection to client */
653 struct obd_export *class_conn2export(struct lustre_handle *conn)
655 struct obd_export *export;
659 CDEBUG(D_CACHE, "looking for null handle\n");
663 if (conn->cookie == -1) { /* this means assign a new connection */
664 CDEBUG(D_CACHE, "want a new connection\n");
668 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
669 export = class_handle2object(conn->cookie);
673 struct obd_device *class_exp2obd(struct obd_export *exp)
680 struct obd_device *class_conn2obd(struct lustre_handle *conn)
682 struct obd_export *export;
683 export = class_conn2export(conn);
685 struct obd_device *obd = export->exp_obd;
686 class_export_put(export);
692 struct obd_import *class_exp2cliimp(struct obd_export *exp)
694 struct obd_device *obd = exp->exp_obd;
697 return obd->u.cli.cl_import;
700 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
702 struct obd_device *obd = class_conn2obd(conn);
705 return obd->u.cli.cl_import;
708 /* Export management functions */
709 static void class_export_destroy(struct obd_export *exp)
711 struct obd_device *obd = exp->exp_obd;
714 LASSERT (atomic_read(&exp->exp_refcount) == 0);
716 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
717 exp->exp_client_uuid.uuid, obd->obd_name);
719 LASSERT(obd != NULL);
721 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
722 if (exp->exp_connection)
723 ptlrpc_put_connection_superhack(exp->exp_connection);
725 LASSERT(list_empty(&exp->exp_outstanding_replies));
726 LASSERT(list_empty(&exp->exp_uncommitted_replies));
727 LASSERT(list_empty(&exp->exp_req_replay_queue));
728 LASSERT(list_empty(&exp->exp_queued_rpc));
729 obd_destroy_export(exp);
730 class_decref(obd, "export", exp);
732 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
736 static void export_handle_addref(void *export)
738 class_export_get(export);
741 struct obd_export *class_export_get(struct obd_export *exp)
743 atomic_inc(&exp->exp_refcount);
744 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
745 atomic_read(&exp->exp_refcount));
748 EXPORT_SYMBOL(class_export_get);
750 void class_export_put(struct obd_export *exp)
752 LASSERT(exp != NULL);
753 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
754 atomic_read(&exp->exp_refcount) - 1);
755 LASSERT(atomic_read(&exp->exp_refcount) > 0);
756 LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
758 if (atomic_dec_and_test(&exp->exp_refcount)) {
759 LASSERT(!list_empty(&exp->exp_obd_chain));
760 CDEBUG(D_IOCTL, "final put %p/%s\n",
761 exp, exp->exp_client_uuid.uuid);
762 obd_zombie_export_add(exp);
765 EXPORT_SYMBOL(class_export_put);
767 /* Creates a new export, adds it to the hash table, and returns a
768 * pointer to it. The refcount is 2: one for the hash reference, and
769 * one for the pointer returned by this function. */
770 struct obd_export *class_new_export(struct obd_device *obd,
771 struct obd_uuid *cluuid)
773 struct obd_export *export;
776 OBD_ALLOC_PTR(export);
778 return ERR_PTR(-ENOMEM);
780 export->exp_conn_cnt = 0;
781 export->exp_lock_hash = NULL;
782 atomic_set(&export->exp_refcount, 2);
783 atomic_set(&export->exp_rpc_count, 0);
784 atomic_set(&export->exp_cb_count, 0);
785 export->exp_obd = obd;
786 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
787 spin_lock_init(&export->exp_uncommitted_replies_lock);
788 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
789 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
790 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
791 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
792 class_handle_hash(&export->exp_handle, export_handle_addref);
793 export->exp_last_request_time = cfs_time_current_sec();
794 spin_lock_init(&export->exp_lock);
795 INIT_HLIST_NODE(&export->exp_uuid_hash);
796 INIT_HLIST_NODE(&export->exp_nid_hash);
798 export->exp_sp_peer = LUSTRE_SP_ANY;
799 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
800 export->exp_client_uuid = *cluuid;
801 obd_init_export(export);
803 spin_lock(&obd->obd_dev_lock);
804 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
805 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
806 &export->exp_uuid_hash);
808 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
809 obd->obd_name, cluuid->uuid, rc);
810 spin_unlock(&obd->obd_dev_lock);
811 class_handle_unhash(&export->exp_handle);
812 OBD_FREE_PTR(export);
813 return ERR_PTR(-EALREADY);
817 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
818 class_incref(obd, "export", export);
819 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
820 list_add_tail(&export->exp_obd_chain_timed,
821 &export->exp_obd->obd_exports_timed);
822 export->exp_obd->obd_num_exports++;
823 spin_unlock(&obd->obd_dev_lock);
827 EXPORT_SYMBOL(class_new_export);
829 void class_unlink_export(struct obd_export *exp)
831 class_handle_unhash(&exp->exp_handle);
833 spin_lock(&exp->exp_obd->obd_dev_lock);
834 /* delete an uuid-export hashitem from hashtables */
835 if (!hlist_unhashed(&exp->exp_uuid_hash))
836 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
837 &exp->exp_client_uuid,
838 &exp->exp_uuid_hash);
840 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
841 list_del_init(&exp->exp_obd_chain_timed);
842 exp->exp_obd->obd_num_exports--;
843 spin_unlock(&exp->exp_obd->obd_dev_lock);
845 /* Keep these counter valid always */
846 spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
847 if (exp->exp_delayed)
848 exp->exp_obd->obd_delayed_clients--;
849 else if (exp->exp_in_recovery)
850 exp->exp_obd->obd_recoverable_clients--;
851 else if (exp->exp_obd->obd_recovering)
852 exp->exp_obd->obd_max_recoverable_clients--;
853 spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
854 class_export_put(exp);
856 EXPORT_SYMBOL(class_unlink_export);
858 /* Import management functions */
859 void class_import_destroy(struct obd_import *imp)
863 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
864 imp->imp_obd->obd_name);
866 LASSERT(atomic_read(&imp->imp_refcount) == 0);
868 ptlrpc_put_connection_superhack(imp->imp_connection);
870 while (!list_empty(&imp->imp_conn_list)) {
871 struct obd_import_conn *imp_conn;
873 imp_conn = list_entry(imp->imp_conn_list.next,
874 struct obd_import_conn, oic_item);
875 list_del_init(&imp_conn->oic_item);
876 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
877 OBD_FREE(imp_conn, sizeof(*imp_conn));
880 LASSERT(imp->imp_sec == NULL);
881 class_decref(imp->imp_obd, "import", imp);
882 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
886 static void import_handle_addref(void *import)
888 class_import_get(import);
891 struct obd_import *class_import_get(struct obd_import *import)
893 LASSERT(atomic_read(&import->imp_refcount) >= 0);
894 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
895 atomic_inc(&import->imp_refcount);
896 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
897 atomic_read(&import->imp_refcount),
898 import->imp_obd->obd_name);
901 EXPORT_SYMBOL(class_import_get);
903 void class_import_put(struct obd_import *imp)
907 LASSERT(atomic_read(&imp->imp_refcount) > 0);
908 LASSERT(atomic_read(&imp->imp_refcount) < 0x5a5a5a);
909 LASSERT(list_empty(&imp->imp_zombie_chain));
911 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
912 atomic_read(&imp->imp_refcount) - 1,
913 imp->imp_obd->obd_name);
915 if (atomic_dec_and_test(&imp->imp_refcount)) {
916 CDEBUG(D_INFO, "final put import %p\n", imp);
917 obd_zombie_import_add(imp);
922 EXPORT_SYMBOL(class_import_put);
924 static void init_imp_at(struct imp_at *at) {
926 at_init(&at->iat_net_latency, 0, 0);
927 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
928 /* max service estimates are tracked on the server side, so
929 don't use the AT history here, just use the last reported
930 val. (But keep hist for proc histogram, worst_ever) */
931 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
936 struct obd_import *class_new_import(struct obd_device *obd)
938 struct obd_import *imp;
940 OBD_ALLOC(imp, sizeof(*imp));
944 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
945 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
946 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
947 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
948 spin_lock_init(&imp->imp_lock);
949 imp->imp_last_success_conn = 0;
950 imp->imp_state = LUSTRE_IMP_NEW;
951 imp->imp_obd = class_incref(obd, "import", imp);
952 sema_init(&imp->imp_sec_mutex, 1);
953 cfs_waitq_init(&imp->imp_recovery_waitq);
955 atomic_set(&imp->imp_refcount, 2);
956 atomic_set(&imp->imp_unregistering, 0);
957 atomic_set(&imp->imp_inflight, 0);
958 atomic_set(&imp->imp_replay_inflight, 0);
959 atomic_set(&imp->imp_inval_count, 0);
960 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
961 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
962 class_handle_hash(&imp->imp_handle, import_handle_addref);
963 init_imp_at(&imp->imp_at);
965 /* the default magic is V2, will be used in connect RPC, and
966 * then adjusted according to the flags in request/reply. */
967 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
971 EXPORT_SYMBOL(class_new_import);
973 void class_destroy_import(struct obd_import *import)
975 LASSERT(import != NULL);
976 LASSERT(import != LP_POISON);
978 class_handle_unhash(&import->imp_handle);
980 spin_lock(&import->imp_lock);
981 import->imp_generation++;
982 spin_unlock(&import->imp_lock);
983 class_import_put(import);
985 EXPORT_SYMBOL(class_destroy_import);
987 /* A connection defines an export context in which preallocation can
988 be managed. This releases the export pointer reference, and returns
989 the export handle, so the export refcount is 1 when this function
991 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
992 struct obd_uuid *cluuid)
994 struct obd_export *export;
995 LASSERT(conn != NULL);
996 LASSERT(obd != NULL);
997 LASSERT(cluuid != NULL);
1000 export = class_new_export(obd, cluuid);
1002 RETURN(PTR_ERR(export));
1004 conn->cookie = export->exp_handle.h_cookie;
1005 class_export_put(export);
1007 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1008 cluuid->uuid, conn->cookie);
1011 EXPORT_SYMBOL(class_connect);
1013 /* if export is involved in recovery then clean up related things */
1014 void class_export_recovery_cleanup(struct obd_export *exp)
1016 struct obd_device *obd = exp->exp_obd;
1018 spin_lock_bh(&obd->obd_processing_task_lock);
1019 if (obd->obd_recovering && exp->exp_in_recovery) {
1020 spin_lock(&exp->exp_lock);
1021 exp->exp_in_recovery = 0;
1022 spin_unlock(&exp->exp_lock);
1023 obd->obd_connected_clients--;
1024 /* each connected client is counted as recoverable */
1025 obd->obd_recoverable_clients--;
1026 if (exp->exp_req_replay_needed) {
1027 spin_lock(&exp->exp_lock);
1028 exp->exp_req_replay_needed = 0;
1029 spin_unlock(&exp->exp_lock);
1030 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1031 atomic_dec(&obd->obd_req_replay_clients);
1033 if (exp->exp_lock_replay_needed) {
1034 spin_lock(&exp->exp_lock);
1035 exp->exp_lock_replay_needed = 0;
1036 spin_unlock(&exp->exp_lock);
1037 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1038 atomic_dec(&obd->obd_lock_replay_clients);
1041 spin_unlock_bh(&obd->obd_processing_task_lock);
1044 /* This function removes 1-3 references from the export:
1045 * 1 - for export pointer passed
1046 * and if disconnect really need
1047 * 2 - removing from hash
1048 * 3 - in client_unlink_export
1049 * The export pointer passed to this function can destroyed */
1050 int class_disconnect(struct obd_export *export)
1052 int already_disconnected;
1055 if (export == NULL) {
1057 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1061 spin_lock(&export->exp_lock);
1062 already_disconnected = export->exp_disconnected;
1063 export->exp_disconnected = 1;
1064 spin_unlock(&export->exp_lock);
1066 /* class_cleanup(), abort_recovery(), and class_fail_export()
1067 * all end up in here, and if any of them race we shouldn't
1068 * call extra class_export_puts(). */
1069 if (already_disconnected) {
1070 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1071 GOTO(no_disconn, already_disconnected);
1074 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1075 export->exp_handle.h_cookie);
1077 if (!hlist_unhashed(&export->exp_nid_hash))
1078 lustre_hash_del(export->exp_obd->obd_nid_hash,
1079 &export->exp_connection->c_peer.nid,
1080 &export->exp_nid_hash);
1082 class_export_recovery_cleanup(export);
1083 class_unlink_export(export);
1085 class_export_put(export);
1089 static void class_disconnect_export_list(struct list_head *list,
1090 enum obd_option flags)
1093 struct obd_export *exp;
1096 /* It's possible that an export may disconnect itself, but
1097 * nothing else will be added to this list. */
1098 while (!list_empty(list)) {
1099 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1100 /* need for safe call CDEBUG after obd_disconnect */
1101 class_export_get(exp);
1103 spin_lock(&exp->exp_lock);
1104 exp->exp_flags = flags;
1105 spin_unlock(&exp->exp_lock);
1107 if (obd_uuid_equals(&exp->exp_client_uuid,
1108 &exp->exp_obd->obd_uuid)) {
1110 "exp %p export uuid == obd uuid, don't discon\n",
1112 /* Need to delete this now so we don't end up pointing
1113 * to work_list later when this export is cleaned up. */
1114 list_del_init(&exp->exp_obd_chain);
1115 class_export_put(exp);
1119 class_export_get(exp);
1120 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1121 "last request at "CFS_TIME_T"\n",
1122 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1123 exp, exp->exp_last_request_time);
1124 /* release one export reference anyway */
1125 rc = obd_disconnect(exp);
1127 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1128 obd_export_nid2str(exp), exp, rc);
1129 class_export_put(exp);
1134 void class_disconnect_exports(struct obd_device *obd)
1136 struct list_head work_list;
1139 /* Move all of the exports from obd_exports to a work list, en masse. */
1140 CFS_INIT_LIST_HEAD(&work_list);
1141 spin_lock(&obd->obd_dev_lock);
1142 list_splice_init(&obd->obd_exports, &work_list);
1143 list_splice_init(&obd->obd_delayed_exports, &work_list);
1144 spin_unlock(&obd->obd_dev_lock);
1146 if (!list_empty(&work_list)) {
1147 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1148 "disconnecting them\n", obd->obd_minor, obd);
1149 class_disconnect_export_list(&work_list,
1150 exp_flags_from_obd(obd));
1152 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1153 obd->obd_minor, obd);
1156 EXPORT_SYMBOL(class_disconnect_exports);
1158 /* Remove exports that have not completed recovery.
1160 void class_disconnect_stale_exports(struct obd_device *obd,
1161 int (*test_export)(struct obd_export *),
1162 enum obd_option flags)
1164 struct list_head work_list;
1165 struct list_head *pos, *n;
1166 struct obd_export *exp;
1169 CFS_INIT_LIST_HEAD(&work_list);
1170 spin_lock(&obd->obd_dev_lock);
1171 obd->obd_stale_clients = 0;
1172 list_for_each_safe(pos, n, &obd->obd_exports) {
1173 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1174 if (test_export(exp))
1177 list_move(&exp->exp_obd_chain, &work_list);
1178 /* don't count self-export as client */
1179 if (obd_uuid_equals(&exp->exp_client_uuid,
1180 &exp->exp_obd->obd_uuid))
1183 obd->obd_stale_clients++;
1184 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1185 obd->obd_name, exp->exp_client_uuid.uuid,
1186 exp->exp_connection == NULL ? "<unknown>" :
1187 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1189 spin_unlock(&obd->obd_dev_lock);
1191 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n", obd->obd_name,
1192 obd->obd_stale_clients);
1194 class_disconnect_export_list(&work_list, flags);
1197 EXPORT_SYMBOL(class_disconnect_stale_exports);
1199 void class_fail_export(struct obd_export *exp)
1201 int rc, already_failed;
1203 spin_lock(&exp->exp_lock);
1204 already_failed = exp->exp_failed;
1205 exp->exp_failed = 1;
1206 spin_unlock(&exp->exp_lock);
1208 if (already_failed) {
1209 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1210 exp, exp->exp_client_uuid.uuid);
1214 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1215 exp, exp->exp_client_uuid.uuid);
1217 if (obd_dump_on_timeout)
1218 libcfs_debug_dumplog();
1220 /* Most callers into obd_disconnect are removing their own reference
1221 * (request, for example) in addition to the one from the hash table.
1222 * We don't have such a reference here, so make one. */
1223 class_export_get(exp);
1224 rc = obd_disconnect(exp);
1226 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1228 CDEBUG(D_HA, "disconnected export %p/%s\n",
1229 exp, exp->exp_client_uuid.uuid);
1231 EXPORT_SYMBOL(class_fail_export);
1233 char *obd_export_nid2str(struct obd_export *exp)
1235 if (exp->exp_connection != NULL)
1236 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1240 EXPORT_SYMBOL(obd_export_nid2str);
1242 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1244 struct obd_export *doomed_exp = NULL;
1245 int exports_evicted = 0;
1247 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1250 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1251 if (doomed_exp == NULL)
1254 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1255 "nid %s found, wanted nid %s, requested nid %s\n",
1256 obd_export_nid2str(doomed_exp),
1257 libcfs_nid2str(nid_key), nid);
1258 LASSERTF(doomed_exp != obd->obd_self_export,
1259 "self-export is hashed by NID?\n");
1261 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1262 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1264 class_fail_export(doomed_exp);
1265 class_export_put(doomed_exp);
1268 if (!exports_evicted)
1269 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1270 obd->obd_name, nid);
1271 return exports_evicted;
1273 EXPORT_SYMBOL(obd_export_evict_by_nid);
1275 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1277 struct obd_export *doomed_exp = NULL;
1278 struct obd_uuid doomed_uuid;
1279 int exports_evicted = 0;
1281 obd_str2uuid(&doomed_uuid, uuid);
1282 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1283 CERROR("%s: can't evict myself\n", obd->obd_name);
1284 return exports_evicted;
1287 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1289 if (doomed_exp == NULL) {
1290 CERROR("%s: can't disconnect %s: no exports found\n",
1291 obd->obd_name, uuid);
1293 CWARN("%s: evicting %s at adminstrative request\n",
1294 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1295 class_fail_export(doomed_exp);
1296 class_export_put(doomed_exp);
1300 return exports_evicted;
1302 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1304 static void print_export_data(struct obd_export *exp, const char *status)
1306 struct ptlrpc_reply_state *rs;
1307 struct ptlrpc_reply_state *first_reply = NULL;
1310 spin_lock(&exp->exp_lock);
1311 list_for_each_entry (rs, &exp->exp_outstanding_replies, rs_exp_list) {
1316 spin_unlock(&exp->exp_lock);
1318 CDEBUG(D_HA, "%s: %s %p %s %s %d %d %d: %p %s %d %d %d %d "LPU64"\n",
1319 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1320 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1321 exp->exp_failed, nreplies, first_reply,
1322 nreplies > 3 ? "..." : "",
1323 atomic_read(&exp->exp_rpc_count),
1324 atomic_read(&exp->exp_cb_count),
1325 exp->exp_disconnected, exp->exp_delayed,
1326 exp->exp_last_committed);
1329 void dump_exports(struct obd_device *obd)
1331 struct obd_export *exp;
1333 spin_lock(&obd->obd_dev_lock);
1334 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1335 print_export_data(exp, "ACTIVE");
1336 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1337 print_export_data(exp, "UNLINKED");
1338 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1339 print_export_data(exp, "DELAYED");
1340 spin_unlock(&obd->obd_dev_lock);
1341 spin_lock(&obd_zombie_impexp_lock);
1342 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1343 print_export_data(exp, "ZOMBIE");
1344 spin_unlock(&obd_zombie_impexp_lock);
1346 EXPORT_SYMBOL(dump_exports);
1348 void obd_exports_barrier(struct obd_device *obd)
1351 LASSERT(list_empty(&obd->obd_exports));
1352 spin_lock(&obd->obd_dev_lock);
1353 while (!list_empty(&obd->obd_unlinked_exports)) {
1354 spin_unlock(&obd->obd_dev_lock);
1355 cfs_schedule_timeout(CFS_TASK_UNINT, cfs_time_seconds(waited));
1356 if (waited > 5 && IS_PO2(waited)) {
1357 LCONSOLE_WARN("Waiting for obd_unlinked_exports "
1358 "more than %d seconds. "
1359 "The obd refcount = %d. Is it stuck?\n",
1360 waited, atomic_read(&obd->obd_refcount));
1364 spin_lock(&obd->obd_dev_lock);
1366 spin_unlock(&obd->obd_dev_lock);
1368 EXPORT_SYMBOL(obd_exports_barrier);
1371 * kill zombie imports and exports
1373 void obd_zombie_impexp_cull(void)
1375 struct obd_import *import;
1376 struct obd_export *export;
1380 spin_lock(&obd_zombie_impexp_lock);
1383 if (!list_empty(&obd_zombie_imports)) {
1384 import = list_entry(obd_zombie_imports.next,
1387 list_del_init(&import->imp_zombie_chain);
1391 if (!list_empty(&obd_zombie_exports)) {
1392 export = list_entry(obd_zombie_exports.next,
1395 list_del_init(&export->exp_obd_chain);
1398 spin_unlock(&obd_zombie_impexp_lock);
1401 class_import_destroy(import);
1404 class_export_destroy(export);
1406 } while (import != NULL || export != NULL);
1410 static struct completion obd_zombie_start;
1411 static struct completion obd_zombie_stop;
1412 static unsigned long obd_zombie_flags;
1413 static cfs_waitq_t obd_zombie_waitq;
1416 OBD_ZOMBIE_STOP = 1 << 1
1420 * check for work for kill zombie import/export thread.
1422 static int obd_zombie_impexp_check(void *arg)
1426 spin_lock(&obd_zombie_impexp_lock);
1427 rc = list_empty(&obd_zombie_imports) &&
1428 list_empty(&obd_zombie_exports) &&
1429 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1431 spin_unlock(&obd_zombie_impexp_lock);
1437 * Add export to the obd_zombe thread and notify it.
1439 static void obd_zombie_export_add(struct obd_export *exp) {
1440 spin_lock(&exp->exp_obd->obd_dev_lock);
1441 LASSERT(!list_empty(&exp->exp_obd_chain));
1442 list_del_init(&exp->exp_obd_chain);
1443 spin_unlock(&exp->exp_obd->obd_dev_lock);
1444 spin_lock(&obd_zombie_impexp_lock);
1445 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1446 spin_unlock(&obd_zombie_impexp_lock);
1448 if (obd_zombie_impexp_notify != NULL)
1449 obd_zombie_impexp_notify();
1453 * Add import to the obd_zombe thread and notify it.
1455 static void obd_zombie_import_add(struct obd_import *imp) {
1456 LASSERT(imp->imp_sec == NULL);
1457 spin_lock(&obd_zombie_impexp_lock);
1458 LASSERT(list_empty(&imp->imp_zombie_chain));
1459 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1460 spin_unlock(&obd_zombie_impexp_lock);
1462 if (obd_zombie_impexp_notify != NULL)
1463 obd_zombie_impexp_notify();
1467 * notify import/export destroy thread about new zombie.
1469 static void obd_zombie_impexp_notify(void)
1471 cfs_waitq_signal(&obd_zombie_waitq);
1475 * check whether obd_zombie is idle
1477 static int obd_zombie_is_idle(void)
1481 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1482 spin_lock(&obd_zombie_impexp_lock);
1483 rc = list_empty(&obd_zombie_imports) &&
1484 list_empty(&obd_zombie_exports);
1485 spin_unlock(&obd_zombie_impexp_lock);
1490 * wait when obd_zombie import/export queues become empty
1492 void obd_zombie_barrier(void)
1494 struct l_wait_info lwi = { 0 };
1495 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1497 EXPORT_SYMBOL(obd_zombie_barrier);
1502 * destroy zombie export/import thread.
1504 static int obd_zombie_impexp_thread(void *unused)
1508 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1509 complete(&obd_zombie_start);
1513 complete(&obd_zombie_start);
1515 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1516 struct l_wait_info lwi = { 0 };
1518 l_wait_event(obd_zombie_waitq,
1519 !obd_zombie_impexp_check(NULL), &lwi);
1520 obd_zombie_impexp_cull();
1523 * Notify obd_zombie_barrier callers that queues
1526 cfs_waitq_signal(&obd_zombie_waitq);
1529 complete(&obd_zombie_stop);
1534 #else /* ! KERNEL */
1536 static atomic_t zombie_recur = ATOMIC_INIT(0);
1537 static void *obd_zombie_impexp_work_cb;
1538 static void *obd_zombie_impexp_idle_cb;
1540 int obd_zombie_impexp_kill(void *arg)
1544 if (atomic_inc_return(&zombie_recur) == 1) {
1545 obd_zombie_impexp_cull();
1548 atomic_dec(&zombie_recur);
1555 * start destroy zombie import/export thread
1557 int obd_zombie_impexp_init(void)
1561 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1562 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1563 spin_lock_init(&obd_zombie_impexp_lock);
1564 init_completion(&obd_zombie_start);
1565 init_completion(&obd_zombie_stop);
1566 cfs_waitq_init(&obd_zombie_waitq);
1569 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1573 wait_for_completion(&obd_zombie_start);
1576 obd_zombie_impexp_work_cb =
1577 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1578 &obd_zombie_impexp_kill, NULL);
1580 obd_zombie_impexp_idle_cb =
1581 liblustre_register_idle_callback("obd_zombi_impexp_check",
1582 &obd_zombie_impexp_check, NULL);
1588 * stop destroy zombie import/export thread
1590 void obd_zombie_impexp_stop(void)
1592 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1593 obd_zombie_impexp_notify();
1595 wait_for_completion(&obd_zombie_stop);
1597 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1598 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);