1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 struct list_head obd_zombie_imports;
60 struct list_head obd_zombie_exports;
61 spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
69 * support functions: we could use inter-module communication, but this
70 * is more portable to other OS's
72 static struct obd_device *obd_device_alloc(void)
74 struct obd_device *obd;
76 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
78 obd->obd_magic = OBD_DEVICE_MAGIC;
82 EXPORT_SYMBOL(obd_device_alloc);
84 static void obd_device_free(struct obd_device *obd)
87 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89 if (obd->obd_namespace != NULL) {
90 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91 obd, obd->obd_namespace, obd->obd_force);
94 lu_ref_fini(&obd->obd_reference);
95 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 struct obd_type *class_search_type(const char *name)
100 struct list_head *tmp;
101 struct obd_type *type;
103 spin_lock(&obd_types_lock);
104 list_for_each(tmp, &obd_types) {
105 type = list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 spin_unlock(&obd_types_lock);
111 spin_unlock(&obd_types_lock);
115 struct obd_type *class_get_type(const char *name)
117 struct obd_type *type = class_search_type(name);
121 const char *modname = name;
122 if (!request_module(modname)) {
123 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124 type = class_search_type(name);
126 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
132 spin_lock(&type->obd_type_lock);
134 try_module_get(type->typ_dt_ops->o_owner);
135 spin_unlock(&type->obd_type_lock);
140 void class_put_type(struct obd_type *type)
143 spin_lock(&type->obd_type_lock);
145 module_put(type->typ_dt_ops->o_owner);
146 spin_unlock(&type->obd_type_lock);
149 #define CLASS_MAX_NAME 1024
151 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
152 struct lprocfs_vars *vars, const char *name,
153 struct lu_device_type *ldt)
155 struct obd_type *type;
160 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
162 if (class_search_type(name)) {
163 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
168 OBD_ALLOC(type, sizeof(*type));
172 OBD_ALLOC_PTR(type->typ_dt_ops);
173 OBD_ALLOC_PTR(type->typ_md_ops);
174 OBD_ALLOC(type->typ_name, strlen(name) + 1);
176 if (type->typ_dt_ops == NULL ||
177 type->typ_md_ops == NULL ||
178 type->typ_name == NULL)
181 *(type->typ_dt_ops) = *dt_ops;
182 /* md_ops is optional */
184 *(type->typ_md_ops) = *md_ops;
185 strcpy(type->typ_name, name);
186 spin_lock_init(&type->obd_type_lock);
189 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
191 if (IS_ERR(type->typ_procroot)) {
192 rc = PTR_ERR(type->typ_procroot);
193 type->typ_procroot = NULL;
199 rc = lu_device_type_init(ldt);
204 spin_lock(&obd_types_lock);
205 list_add(&type->typ_chain, &obd_types);
206 spin_unlock(&obd_types_lock);
211 if (type->typ_name != NULL)
212 OBD_FREE(type->typ_name, strlen(name) + 1);
213 if (type->typ_md_ops != NULL)
214 OBD_FREE_PTR(type->typ_md_ops);
215 if (type->typ_dt_ops != NULL)
216 OBD_FREE_PTR(type->typ_dt_ops);
217 OBD_FREE(type, sizeof(*type));
221 int class_unregister_type(const char *name)
223 struct obd_type *type = class_search_type(name);
227 CERROR("unknown obd type\n");
231 if (type->typ_refcnt) {
232 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
233 /* This is a bad situation, let's make the best of it */
234 /* Remove ops, but leave the name for debugging */
235 OBD_FREE_PTR(type->typ_dt_ops);
236 OBD_FREE_PTR(type->typ_md_ops);
240 if (type->typ_procroot) {
241 lprocfs_remove(&type->typ_procroot);
245 lu_device_type_fini(type->typ_lu);
247 spin_lock(&obd_types_lock);
248 list_del(&type->typ_chain);
249 spin_unlock(&obd_types_lock);
250 OBD_FREE(type->typ_name, strlen(name) + 1);
251 if (type->typ_dt_ops != NULL)
252 OBD_FREE_PTR(type->typ_dt_ops);
253 if (type->typ_md_ops != NULL)
254 OBD_FREE_PTR(type->typ_md_ops);
255 OBD_FREE(type, sizeof(*type));
257 } /* class_unregister_type */
260 * Create a new obd device.
262 * Find an empty slot in ::obd_devs[], create a new obd device in it.
264 * \param typename [in] obd device type string.
265 * \param name [in] obd device name.
267 * \retval NULL if create fails, otherwise return the obd device
270 struct obd_device *class_newdev(const char *type_name, const char *name)
272 struct obd_device *result = NULL;
273 struct obd_device *newdev;
274 struct obd_type *type = NULL;
276 int new_obd_minor = 0;
278 if (strlen(name) >= MAX_OBD_NAME) {
279 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
280 RETURN(ERR_PTR(-EINVAL));
283 type = class_get_type(type_name);
285 CERROR("OBD: unknown type: %s\n", type_name);
286 RETURN(ERR_PTR(-ENODEV));
289 newdev = obd_device_alloc();
290 if (newdev == NULL) {
291 class_put_type(type);
292 RETURN(ERR_PTR(-ENOMEM));
294 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
296 spin_lock(&obd_dev_lock);
297 for (i = 0; i < class_devno_max(); i++) {
298 struct obd_device *obd = class_num2obd(i);
299 if (obd && obd->obd_name &&
300 (strcmp(name, obd->obd_name) == 0)) {
301 CERROR("Device %s already exists, won't add\n", name);
303 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
304 "%p obd_magic %08x != %08x\n", result,
305 result->obd_magic, OBD_DEVICE_MAGIC);
306 LASSERTF(result->obd_minor == new_obd_minor,
307 "%p obd_minor %d != %d\n", result,
308 result->obd_minor, new_obd_minor);
310 obd_devs[result->obd_minor] = NULL;
311 result->obd_name[0]='\0';
313 result = ERR_PTR(-EEXIST);
316 if (!result && !obd) {
318 result->obd_minor = i;
320 result->obd_type = type;
321 strncpy(result->obd_name, name,
322 sizeof(result->obd_name) - 1);
323 obd_devs[i] = result;
326 spin_unlock(&obd_dev_lock);
328 if (result == NULL && i >= class_devno_max()) {
329 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
331 result = ERR_PTR(-EOVERFLOW);
334 if (IS_ERR(result)) {
335 obd_device_free(newdev);
336 class_put_type(type);
338 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
339 result->obd_name, result);
344 void class_release_dev(struct obd_device *obd)
346 struct obd_type *obd_type = obd->obd_type;
348 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
349 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
350 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
351 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
352 LASSERT(obd_type != NULL);
354 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
355 obd->obd_name,obd->obd_type->typ_name);
357 spin_lock(&obd_dev_lock);
358 obd_devs[obd->obd_minor] = NULL;
359 spin_unlock(&obd_dev_lock);
360 obd_device_free(obd);
362 class_put_type(obd_type);
365 int class_name2dev(const char *name)
372 spin_lock(&obd_dev_lock);
373 for (i = 0; i < class_devno_max(); i++) {
374 struct obd_device *obd = class_num2obd(i);
375 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
376 /* Make sure we finished attaching before we give
377 out any references */
378 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
379 if (obd->obd_attached) {
380 spin_unlock(&obd_dev_lock);
386 spin_unlock(&obd_dev_lock);
391 struct obd_device *class_name2obd(const char *name)
393 int dev = class_name2dev(name);
395 if (dev < 0 || dev > class_devno_max())
397 return class_num2obd(dev);
400 int class_uuid2dev(struct obd_uuid *uuid)
404 spin_lock(&obd_dev_lock);
405 for (i = 0; i < class_devno_max(); i++) {
406 struct obd_device *obd = class_num2obd(i);
407 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
408 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
409 spin_unlock(&obd_dev_lock);
413 spin_unlock(&obd_dev_lock);
418 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
420 int dev = class_uuid2dev(uuid);
423 return class_num2obd(dev);
427 * Get obd device from ::obd_devs[]
429 * \param num [in] array index
431 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
432 * otherwise return the obd device there.
434 struct obd_device *class_num2obd(int num)
436 struct obd_device *obd = NULL;
438 if (num < class_devno_max()) {
443 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
444 "%p obd_magic %08x != %08x\n",
445 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
446 LASSERTF(obd->obd_minor == num,
447 "%p obd_minor %0d != %0d\n",
448 obd, obd->obd_minor, num);
454 void class_obd_list(void)
459 spin_lock(&obd_dev_lock);
460 for (i = 0; i < class_devno_max(); i++) {
461 struct obd_device *obd = class_num2obd(i);
464 if (obd->obd_stopping)
466 else if (obd->obd_set_up)
468 else if (obd->obd_attached)
472 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
473 i, status, obd->obd_type->typ_name,
474 obd->obd_name, obd->obd_uuid.uuid,
475 atomic_read(&obd->obd_refcount));
477 spin_unlock(&obd_dev_lock);
481 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
482 specified, then only the client with that uuid is returned,
483 otherwise any client connected to the tgt is returned. */
484 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
485 const char * typ_name,
486 struct obd_uuid *grp_uuid)
490 spin_lock(&obd_dev_lock);
491 for (i = 0; i < class_devno_max(); i++) {
492 struct obd_device *obd = class_num2obd(i);
495 if ((strncmp(obd->obd_type->typ_name, typ_name,
496 strlen(typ_name)) == 0)) {
497 if (obd_uuid_equals(tgt_uuid,
498 &obd->u.cli.cl_target_uuid) &&
499 ((grp_uuid)? obd_uuid_equals(grp_uuid,
500 &obd->obd_uuid) : 1)) {
501 spin_unlock(&obd_dev_lock);
506 spin_unlock(&obd_dev_lock);
511 /* Iterate the obd_device list looking devices have grp_uuid. Start
512 searching at *next, and if a device is found, the next index to look
513 at is saved in *next. If next is NULL, then the first matching device
514 will always be returned. */
515 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
521 else if (*next >= 0 && *next < class_devno_max())
526 spin_lock(&obd_dev_lock);
527 for (; i < class_devno_max(); i++) {
528 struct obd_device *obd = class_num2obd(i);
531 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
534 spin_unlock(&obd_dev_lock);
538 spin_unlock(&obd_dev_lock);
544 * to notify sptlrpc log for @fsname has changed, let every relevant OBD
545 * adjust sptlrpc settings accordingly.
547 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
549 struct obd_device *obd;
553 LASSERT(namelen > 0);
555 spin_lock(&obd_dev_lock);
556 for (i = 0; i < class_devno_max(); i++) {
557 obd = class_num2obd(i);
559 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
562 /* only notify mdc, osc, mdt, ost */
563 type = obd->obd_type->typ_name;
564 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
565 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
566 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
567 strcmp(type, LUSTRE_OST_NAME) != 0)
570 if (strncmp(obd->obd_name, fsname, namelen))
573 class_incref(obd, __FUNCTION__, obd);
574 spin_unlock(&obd_dev_lock);
575 rc2 = obd_set_info_async(obd->obd_self_export,
576 sizeof(KEY_SPTLRPC_CONF),
577 KEY_SPTLRPC_CONF, 0, NULL, NULL);
579 class_decref(obd, __FUNCTION__, obd);
580 spin_lock(&obd_dev_lock);
582 spin_unlock(&obd_dev_lock);
585 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
587 void obd_cleanup_caches(void)
592 if (obd_device_cachep) {
593 rc = cfs_mem_cache_destroy(obd_device_cachep);
594 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
595 obd_device_cachep = NULL;
598 rc = cfs_mem_cache_destroy(obdo_cachep);
599 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
603 rc = cfs_mem_cache_destroy(import_cachep);
604 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
605 import_cachep = NULL;
608 rc = cfs_mem_cache_destroy(capa_cachep);
609 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
615 int obd_init_caches(void)
619 LASSERT(obd_device_cachep == NULL);
620 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
621 sizeof(struct obd_device),
623 if (!obd_device_cachep)
626 LASSERT(obdo_cachep == NULL);
627 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
632 LASSERT(import_cachep == NULL);
633 import_cachep = cfs_mem_cache_create("ll_import_cache",
634 sizeof(struct obd_import),
639 LASSERT(capa_cachep == NULL);
640 capa_cachep = cfs_mem_cache_create("capa_cache",
641 sizeof(struct obd_capa), 0, 0);
647 obd_cleanup_caches();
652 /* map connection to client */
653 struct obd_export *class_conn2export(struct lustre_handle *conn)
655 struct obd_export *export;
659 CDEBUG(D_CACHE, "looking for null handle\n");
663 if (conn->cookie == -1) { /* this means assign a new connection */
664 CDEBUG(D_CACHE, "want a new connection\n");
668 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
669 export = class_handle2object(conn->cookie);
673 struct obd_device *class_exp2obd(struct obd_export *exp)
680 struct obd_device *class_conn2obd(struct lustre_handle *conn)
682 struct obd_export *export;
683 export = class_conn2export(conn);
685 struct obd_device *obd = export->exp_obd;
686 class_export_put(export);
692 struct obd_import *class_exp2cliimp(struct obd_export *exp)
694 struct obd_device *obd = exp->exp_obd;
697 return obd->u.cli.cl_import;
700 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
702 struct obd_device *obd = class_conn2obd(conn);
705 return obd->u.cli.cl_import;
708 /* Export management functions */
709 static void class_export_destroy(struct obd_export *exp)
711 struct obd_device *obd = exp->exp_obd;
714 LASSERT (atomic_read(&exp->exp_refcount) == 0);
716 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
717 exp->exp_client_uuid.uuid, obd->obd_name);
719 LASSERT(obd != NULL);
721 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
722 if (exp->exp_connection)
723 ptlrpc_put_connection_superhack(exp->exp_connection);
725 LASSERT(list_empty(&exp->exp_outstanding_replies));
726 LASSERT(list_empty(&exp->exp_uncommitted_replies));
727 LASSERT(list_empty(&exp->exp_req_replay_queue));
728 LASSERT(list_empty(&exp->exp_queued_rpc));
729 obd_destroy_export(exp);
730 class_decref(obd, "export", exp);
732 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
736 static void export_handle_addref(void *export)
738 class_export_get(export);
741 struct obd_export *class_export_get(struct obd_export *exp)
743 atomic_inc(&exp->exp_refcount);
744 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
745 atomic_read(&exp->exp_refcount));
748 EXPORT_SYMBOL(class_export_get);
750 void class_export_put(struct obd_export *exp)
752 LASSERT(exp != NULL);
753 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
754 atomic_read(&exp->exp_refcount) - 1);
755 LASSERT(atomic_read(&exp->exp_refcount) > 0);
756 LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
758 if (atomic_dec_and_test(&exp->exp_refcount)) {
759 LASSERT(!list_empty(&exp->exp_obd_chain));
760 CDEBUG(D_IOCTL, "final put %p/%s\n",
761 exp, exp->exp_client_uuid.uuid);
762 obd_zombie_export_add(exp);
765 EXPORT_SYMBOL(class_export_put);
767 /* Creates a new export, adds it to the hash table, and returns a
768 * pointer to it. The refcount is 2: one for the hash reference, and
769 * one for the pointer returned by this function. */
770 struct obd_export *class_new_export(struct obd_device *obd,
771 struct obd_uuid *cluuid)
773 struct obd_export *export;
776 OBD_ALLOC_PTR(export);
778 return ERR_PTR(-ENOMEM);
780 export->exp_conn_cnt = 0;
781 export->exp_lock_hash = NULL;
782 atomic_set(&export->exp_refcount, 2);
783 atomic_set(&export->exp_rpc_count, 0);
784 atomic_set(&export->exp_cb_count, 0);
785 atomic_set(&export->exp_locks_count, 0);
786 export->exp_obd = obd;
787 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
788 spin_lock_init(&export->exp_uncommitted_replies_lock);
789 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
790 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
791 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
792 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
793 class_handle_hash(&export->exp_handle, export_handle_addref);
794 export->exp_last_request_time = cfs_time_current_sec();
795 spin_lock_init(&export->exp_lock);
796 INIT_HLIST_NODE(&export->exp_uuid_hash);
797 INIT_HLIST_NODE(&export->exp_nid_hash);
799 export->exp_sp_peer = LUSTRE_SP_ANY;
800 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
801 export->exp_client_uuid = *cluuid;
802 obd_init_export(export);
804 spin_lock(&obd->obd_dev_lock);
805 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
806 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
807 &export->exp_uuid_hash);
809 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
810 obd->obd_name, cluuid->uuid, rc);
811 spin_unlock(&obd->obd_dev_lock);
812 class_handle_unhash(&export->exp_handle);
813 OBD_FREE_PTR(export);
814 return ERR_PTR(-EALREADY);
818 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
819 class_incref(obd, "export", export);
820 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
821 list_add_tail(&export->exp_obd_chain_timed,
822 &export->exp_obd->obd_exports_timed);
823 export->exp_obd->obd_num_exports++;
824 spin_unlock(&obd->obd_dev_lock);
828 EXPORT_SYMBOL(class_new_export);
830 void class_unlink_export(struct obd_export *exp)
832 class_handle_unhash(&exp->exp_handle);
834 spin_lock(&exp->exp_obd->obd_dev_lock);
835 /* delete an uuid-export hashitem from hashtables */
836 if (!hlist_unhashed(&exp->exp_uuid_hash))
837 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
838 &exp->exp_client_uuid,
839 &exp->exp_uuid_hash);
841 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
842 list_del_init(&exp->exp_obd_chain_timed);
843 exp->exp_obd->obd_num_exports--;
844 spin_unlock(&exp->exp_obd->obd_dev_lock);
846 /* Keep these counter valid always */
847 spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
848 if (exp->exp_delayed)
849 exp->exp_obd->obd_delayed_clients--;
850 else if (exp->exp_in_recovery)
851 exp->exp_obd->obd_recoverable_clients--;
852 else if (exp->exp_obd->obd_recovering)
853 exp->exp_obd->obd_max_recoverable_clients--;
854 spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
855 class_export_put(exp);
857 EXPORT_SYMBOL(class_unlink_export);
859 /* Import management functions */
860 void class_import_destroy(struct obd_import *imp)
864 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
865 imp->imp_obd->obd_name);
867 LASSERT(atomic_read(&imp->imp_refcount) == 0);
869 ptlrpc_put_connection_superhack(imp->imp_connection);
871 while (!list_empty(&imp->imp_conn_list)) {
872 struct obd_import_conn *imp_conn;
874 imp_conn = list_entry(imp->imp_conn_list.next,
875 struct obd_import_conn, oic_item);
876 list_del_init(&imp_conn->oic_item);
877 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
878 OBD_FREE(imp_conn, sizeof(*imp_conn));
881 LASSERT(imp->imp_sec == NULL);
882 class_decref(imp->imp_obd, "import", imp);
883 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
887 static void import_handle_addref(void *import)
889 class_import_get(import);
892 struct obd_import *class_import_get(struct obd_import *import)
894 LASSERT(atomic_read(&import->imp_refcount) >= 0);
895 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
896 atomic_inc(&import->imp_refcount);
897 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
898 atomic_read(&import->imp_refcount),
899 import->imp_obd->obd_name);
902 EXPORT_SYMBOL(class_import_get);
904 void class_import_put(struct obd_import *imp)
908 LASSERT(atomic_read(&imp->imp_refcount) > 0);
909 LASSERT(atomic_read(&imp->imp_refcount) < 0x5a5a5a);
910 LASSERT(list_empty(&imp->imp_zombie_chain));
912 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
913 atomic_read(&imp->imp_refcount) - 1,
914 imp->imp_obd->obd_name);
916 if (atomic_dec_and_test(&imp->imp_refcount)) {
917 CDEBUG(D_INFO, "final put import %p\n", imp);
918 obd_zombie_import_add(imp);
923 EXPORT_SYMBOL(class_import_put);
925 static void init_imp_at(struct imp_at *at) {
927 at_init(&at->iat_net_latency, 0, 0);
928 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
929 /* max service estimates are tracked on the server side, so
930 don't use the AT history here, just use the last reported
931 val. (But keep hist for proc histogram, worst_ever) */
932 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
937 struct obd_import *class_new_import(struct obd_device *obd)
939 struct obd_import *imp;
941 OBD_ALLOC(imp, sizeof(*imp));
945 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
946 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
947 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
948 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
949 spin_lock_init(&imp->imp_lock);
950 imp->imp_last_success_conn = 0;
951 imp->imp_state = LUSTRE_IMP_NEW;
952 imp->imp_obd = class_incref(obd, "import", imp);
953 sema_init(&imp->imp_sec_mutex, 1);
954 cfs_waitq_init(&imp->imp_recovery_waitq);
956 atomic_set(&imp->imp_refcount, 2);
957 atomic_set(&imp->imp_unregistering, 0);
958 atomic_set(&imp->imp_inflight, 0);
959 atomic_set(&imp->imp_replay_inflight, 0);
960 atomic_set(&imp->imp_inval_count, 0);
961 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
962 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
963 class_handle_hash(&imp->imp_handle, import_handle_addref);
964 init_imp_at(&imp->imp_at);
966 /* the default magic is V2, will be used in connect RPC, and
967 * then adjusted according to the flags in request/reply. */
968 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
972 EXPORT_SYMBOL(class_new_import);
974 void class_destroy_import(struct obd_import *import)
976 LASSERT(import != NULL);
977 LASSERT(import != LP_POISON);
979 class_handle_unhash(&import->imp_handle);
981 spin_lock(&import->imp_lock);
982 import->imp_generation++;
983 spin_unlock(&import->imp_lock);
984 class_import_put(import);
986 EXPORT_SYMBOL(class_destroy_import);
988 /* A connection defines an export context in which preallocation can
989 be managed. This releases the export pointer reference, and returns
990 the export handle, so the export refcount is 1 when this function
992 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
993 struct obd_uuid *cluuid)
995 struct obd_export *export;
996 LASSERT(conn != NULL);
997 LASSERT(obd != NULL);
998 LASSERT(cluuid != NULL);
1001 export = class_new_export(obd, cluuid);
1003 RETURN(PTR_ERR(export));
1005 conn->cookie = export->exp_handle.h_cookie;
1006 class_export_put(export);
1008 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1009 cluuid->uuid, conn->cookie);
1012 EXPORT_SYMBOL(class_connect);
1014 /* if export is involved in recovery then clean up related things */
1015 void class_export_recovery_cleanup(struct obd_export *exp)
1017 struct obd_device *obd = exp->exp_obd;
1019 spin_lock_bh(&obd->obd_processing_task_lock);
1020 if (obd->obd_recovering && exp->exp_in_recovery) {
1021 spin_lock(&exp->exp_lock);
1022 exp->exp_in_recovery = 0;
1023 spin_unlock(&exp->exp_lock);
1024 obd->obd_connected_clients--;
1025 /* each connected client is counted as recoverable */
1026 obd->obd_recoverable_clients--;
1027 if (exp->exp_req_replay_needed) {
1028 spin_lock(&exp->exp_lock);
1029 exp->exp_req_replay_needed = 0;
1030 spin_unlock(&exp->exp_lock);
1031 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1032 atomic_dec(&obd->obd_req_replay_clients);
1034 if (exp->exp_lock_replay_needed) {
1035 spin_lock(&exp->exp_lock);
1036 exp->exp_lock_replay_needed = 0;
1037 spin_unlock(&exp->exp_lock);
1038 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1039 atomic_dec(&obd->obd_lock_replay_clients);
1042 spin_unlock_bh(&obd->obd_processing_task_lock);
1045 /* This function removes 1-3 references from the export:
1046 * 1 - for export pointer passed
1047 * and if disconnect really need
1048 * 2 - removing from hash
1049 * 3 - in client_unlink_export
1050 * The export pointer passed to this function can destroyed */
1051 int class_disconnect(struct obd_export *export)
1053 int already_disconnected;
1056 if (export == NULL) {
1058 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1062 spin_lock(&export->exp_lock);
1063 already_disconnected = export->exp_disconnected;
1064 export->exp_disconnected = 1;
1065 spin_unlock(&export->exp_lock);
1067 /* class_cleanup(), abort_recovery(), and class_fail_export()
1068 * all end up in here, and if any of them race we shouldn't
1069 * call extra class_export_puts(). */
1070 if (already_disconnected) {
1071 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1072 GOTO(no_disconn, already_disconnected);
1075 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1076 export->exp_handle.h_cookie);
1078 if (!hlist_unhashed(&export->exp_nid_hash))
1079 lustre_hash_del(export->exp_obd->obd_nid_hash,
1080 &export->exp_connection->c_peer.nid,
1081 &export->exp_nid_hash);
1083 class_export_recovery_cleanup(export);
1084 class_unlink_export(export);
1086 class_export_put(export);
1090 static void class_disconnect_export_list(struct list_head *list,
1091 enum obd_option flags)
1094 struct obd_export *exp;
1097 /* It's possible that an export may disconnect itself, but
1098 * nothing else will be added to this list. */
1099 while (!list_empty(list)) {
1100 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1101 /* need for safe call CDEBUG after obd_disconnect */
1102 class_export_get(exp);
1104 spin_lock(&exp->exp_lock);
1105 exp->exp_flags = flags;
1106 spin_unlock(&exp->exp_lock);
1108 if (obd_uuid_equals(&exp->exp_client_uuid,
1109 &exp->exp_obd->obd_uuid)) {
1111 "exp %p export uuid == obd uuid, don't discon\n",
1113 /* Need to delete this now so we don't end up pointing
1114 * to work_list later when this export is cleaned up. */
1115 list_del_init(&exp->exp_obd_chain);
1116 class_export_put(exp);
1120 class_export_get(exp);
1121 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1122 "last request at "CFS_TIME_T"\n",
1123 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1124 exp, exp->exp_last_request_time);
1125 /* release one export reference anyway */
1126 rc = obd_disconnect(exp);
1128 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1129 obd_export_nid2str(exp), exp, rc);
1130 class_export_put(exp);
1135 void class_disconnect_exports(struct obd_device *obd)
1137 struct list_head work_list;
1140 /* Move all of the exports from obd_exports to a work list, en masse. */
1141 CFS_INIT_LIST_HEAD(&work_list);
1142 spin_lock(&obd->obd_dev_lock);
1143 list_splice_init(&obd->obd_exports, &work_list);
1144 list_splice_init(&obd->obd_delayed_exports, &work_list);
1145 spin_unlock(&obd->obd_dev_lock);
1147 if (!list_empty(&work_list)) {
1148 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1149 "disconnecting them\n", obd->obd_minor, obd);
1150 class_disconnect_export_list(&work_list,
1151 exp_flags_from_obd(obd));
1153 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1154 obd->obd_minor, obd);
1157 EXPORT_SYMBOL(class_disconnect_exports);
1159 /* Remove exports that have not completed recovery.
1161 void class_disconnect_stale_exports(struct obd_device *obd,
1162 int (*test_export)(struct obd_export *),
1163 enum obd_option flags)
1165 struct list_head work_list;
1166 struct list_head *pos, *n;
1167 struct obd_export *exp;
1170 CFS_INIT_LIST_HEAD(&work_list);
1171 spin_lock(&obd->obd_dev_lock);
1172 obd->obd_stale_clients = 0;
1173 list_for_each_safe(pos, n, &obd->obd_exports) {
1174 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1175 if (test_export(exp))
1178 list_move(&exp->exp_obd_chain, &work_list);
1179 /* don't count self-export as client */
1180 if (obd_uuid_equals(&exp->exp_client_uuid,
1181 &exp->exp_obd->obd_uuid))
1184 obd->obd_stale_clients++;
1185 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1186 obd->obd_name, exp->exp_client_uuid.uuid,
1187 exp->exp_connection == NULL ? "<unknown>" :
1188 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1190 spin_unlock(&obd->obd_dev_lock);
1192 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n", obd->obd_name,
1193 obd->obd_stale_clients);
1195 class_disconnect_export_list(&work_list, flags);
1198 EXPORT_SYMBOL(class_disconnect_stale_exports);
1200 void class_fail_export(struct obd_export *exp)
1202 int rc, already_failed;
1204 spin_lock(&exp->exp_lock);
1205 already_failed = exp->exp_failed;
1206 exp->exp_failed = 1;
1207 spin_unlock(&exp->exp_lock);
1209 if (already_failed) {
1210 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1211 exp, exp->exp_client_uuid.uuid);
1215 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1216 exp, exp->exp_client_uuid.uuid);
1218 if (obd_dump_on_timeout)
1219 libcfs_debug_dumplog();
1221 /* Most callers into obd_disconnect are removing their own reference
1222 * (request, for example) in addition to the one from the hash table.
1223 * We don't have such a reference here, so make one. */
1224 class_export_get(exp);
1225 rc = obd_disconnect(exp);
1227 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1229 CDEBUG(D_HA, "disconnected export %p/%s\n",
1230 exp, exp->exp_client_uuid.uuid);
1232 EXPORT_SYMBOL(class_fail_export);
1234 char *obd_export_nid2str(struct obd_export *exp)
1236 if (exp->exp_connection != NULL)
1237 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1241 EXPORT_SYMBOL(obd_export_nid2str);
1243 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1245 struct obd_export *doomed_exp = NULL;
1246 int exports_evicted = 0;
1248 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1251 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1252 if (doomed_exp == NULL)
1255 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1256 "nid %s found, wanted nid %s, requested nid %s\n",
1257 obd_export_nid2str(doomed_exp),
1258 libcfs_nid2str(nid_key), nid);
1259 LASSERTF(doomed_exp != obd->obd_self_export,
1260 "self-export is hashed by NID?\n");
1262 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1263 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1265 class_fail_export(doomed_exp);
1266 class_export_put(doomed_exp);
1269 if (!exports_evicted)
1270 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1271 obd->obd_name, nid);
1272 return exports_evicted;
1274 EXPORT_SYMBOL(obd_export_evict_by_nid);
1276 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1278 struct obd_export *doomed_exp = NULL;
1279 struct obd_uuid doomed_uuid;
1280 int exports_evicted = 0;
1282 obd_str2uuid(&doomed_uuid, uuid);
1283 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1284 CERROR("%s: can't evict myself\n", obd->obd_name);
1285 return exports_evicted;
1288 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1290 if (doomed_exp == NULL) {
1291 CERROR("%s: can't disconnect %s: no exports found\n",
1292 obd->obd_name, uuid);
1294 CWARN("%s: evicting %s at adminstrative request\n",
1295 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1296 class_fail_export(doomed_exp);
1297 class_export_put(doomed_exp);
1301 return exports_evicted;
1303 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1305 static void print_export_data(struct obd_export *exp, const char *status)
1307 struct ptlrpc_reply_state *rs;
1308 struct ptlrpc_reply_state *first_reply = NULL;
1311 spin_lock(&exp->exp_lock);
1312 list_for_each_entry (rs, &exp->exp_outstanding_replies, rs_exp_list) {
1317 spin_unlock(&exp->exp_lock);
1319 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1320 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1321 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1322 atomic_read(&exp->exp_rpc_count),
1323 atomic_read(&exp->exp_cb_count),
1324 atomic_read(&exp->exp_locks_count),
1325 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1326 nreplies, first_reply, nreplies > 3 ? "..." : "",
1327 exp->exp_last_committed);
1330 void dump_exports(struct obd_device *obd)
1332 struct obd_export *exp;
1334 spin_lock(&obd->obd_dev_lock);
1335 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1336 print_export_data(exp, "ACTIVE");
1337 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1338 print_export_data(exp, "UNLINKED");
1339 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1340 print_export_data(exp, "DELAYED");
1341 spin_unlock(&obd->obd_dev_lock);
1342 spin_lock(&obd_zombie_impexp_lock);
1343 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1344 print_export_data(exp, "ZOMBIE");
1345 spin_unlock(&obd_zombie_impexp_lock);
1347 EXPORT_SYMBOL(dump_exports);
1349 void obd_exports_barrier(struct obd_device *obd)
1352 LASSERT(list_empty(&obd->obd_exports));
1353 spin_lock(&obd->obd_dev_lock);
1354 while (!list_empty(&obd->obd_unlinked_exports)) {
1355 spin_unlock(&obd->obd_dev_lock);
1356 cfs_schedule_timeout(CFS_TASK_UNINT, cfs_time_seconds(waited));
1357 if (waited > 5 && IS_PO2(waited)) {
1358 LCONSOLE_WARN("Waiting for obd_unlinked_exports "
1359 "more than %d seconds. "
1360 "The obd refcount = %d. Is it stuck?\n",
1361 waited, atomic_read(&obd->obd_refcount));
1365 spin_lock(&obd->obd_dev_lock);
1367 spin_unlock(&obd->obd_dev_lock);
1369 EXPORT_SYMBOL(obd_exports_barrier);
1372 * kill zombie imports and exports
1374 void obd_zombie_impexp_cull(void)
1376 struct obd_import *import;
1377 struct obd_export *export;
1381 spin_lock(&obd_zombie_impexp_lock);
1384 if (!list_empty(&obd_zombie_imports)) {
1385 import = list_entry(obd_zombie_imports.next,
1388 list_del_init(&import->imp_zombie_chain);
1392 if (!list_empty(&obd_zombie_exports)) {
1393 export = list_entry(obd_zombie_exports.next,
1396 list_del_init(&export->exp_obd_chain);
1399 spin_unlock(&obd_zombie_impexp_lock);
1402 class_import_destroy(import);
1405 class_export_destroy(export);
1407 } while (import != NULL || export != NULL);
1411 static struct completion obd_zombie_start;
1412 static struct completion obd_zombie_stop;
1413 static unsigned long obd_zombie_flags;
1414 static cfs_waitq_t obd_zombie_waitq;
1417 OBD_ZOMBIE_STOP = 1 << 1
1421 * check for work for kill zombie import/export thread.
1423 static int obd_zombie_impexp_check(void *arg)
1427 spin_lock(&obd_zombie_impexp_lock);
1428 rc = list_empty(&obd_zombie_imports) &&
1429 list_empty(&obd_zombie_exports) &&
1430 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1432 spin_unlock(&obd_zombie_impexp_lock);
1438 * Add export to the obd_zombe thread and notify it.
1440 static void obd_zombie_export_add(struct obd_export *exp) {
1441 spin_lock(&exp->exp_obd->obd_dev_lock);
1442 LASSERT(!list_empty(&exp->exp_obd_chain));
1443 list_del_init(&exp->exp_obd_chain);
1444 spin_unlock(&exp->exp_obd->obd_dev_lock);
1445 spin_lock(&obd_zombie_impexp_lock);
1446 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1447 spin_unlock(&obd_zombie_impexp_lock);
1449 if (obd_zombie_impexp_notify != NULL)
1450 obd_zombie_impexp_notify();
1454 * Add import to the obd_zombe thread and notify it.
1456 static void obd_zombie_import_add(struct obd_import *imp) {
1457 LASSERT(imp->imp_sec == NULL);
1458 spin_lock(&obd_zombie_impexp_lock);
1459 LASSERT(list_empty(&imp->imp_zombie_chain));
1460 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1461 spin_unlock(&obd_zombie_impexp_lock);
1463 if (obd_zombie_impexp_notify != NULL)
1464 obd_zombie_impexp_notify();
1468 * notify import/export destroy thread about new zombie.
1470 static void obd_zombie_impexp_notify(void)
1472 cfs_waitq_signal(&obd_zombie_waitq);
1476 * check whether obd_zombie is idle
1478 static int obd_zombie_is_idle(void)
1482 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1483 spin_lock(&obd_zombie_impexp_lock);
1484 rc = list_empty(&obd_zombie_imports) &&
1485 list_empty(&obd_zombie_exports);
1486 spin_unlock(&obd_zombie_impexp_lock);
1491 * wait when obd_zombie import/export queues become empty
1493 void obd_zombie_barrier(void)
1495 struct l_wait_info lwi = { 0 };
1496 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1498 EXPORT_SYMBOL(obd_zombie_barrier);
1503 * destroy zombie export/import thread.
1505 static int obd_zombie_impexp_thread(void *unused)
1509 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1510 complete(&obd_zombie_start);
1514 complete(&obd_zombie_start);
1516 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1517 struct l_wait_info lwi = { 0 };
1519 l_wait_event(obd_zombie_waitq,
1520 !obd_zombie_impexp_check(NULL), &lwi);
1521 obd_zombie_impexp_cull();
1524 * Notify obd_zombie_barrier callers that queues
1527 cfs_waitq_signal(&obd_zombie_waitq);
1530 complete(&obd_zombie_stop);
1535 #else /* ! KERNEL */
1537 static atomic_t zombie_recur = ATOMIC_INIT(0);
1538 static void *obd_zombie_impexp_work_cb;
1539 static void *obd_zombie_impexp_idle_cb;
1541 int obd_zombie_impexp_kill(void *arg)
1545 if (atomic_inc_return(&zombie_recur) == 1) {
1546 obd_zombie_impexp_cull();
1549 atomic_dec(&zombie_recur);
1556 * start destroy zombie import/export thread
1558 int obd_zombie_impexp_init(void)
1562 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1563 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1564 spin_lock_init(&obd_zombie_impexp_lock);
1565 init_completion(&obd_zombie_start);
1566 init_completion(&obd_zombie_stop);
1567 cfs_waitq_init(&obd_zombie_waitq);
1570 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1574 wait_for_completion(&obd_zombie_start);
1577 obd_zombie_impexp_work_cb =
1578 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1579 &obd_zombie_impexp_kill, NULL);
1581 obd_zombie_impexp_idle_cb =
1582 liblustre_register_idle_callback("obd_zombi_impexp_check",
1583 &obd_zombie_impexp_check, NULL);
1589 * stop destroy zombie import/export thread
1591 void obd_zombie_impexp_stop(void)
1593 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1594 obd_zombie_impexp_notify();
1596 wait_for_completion(&obd_zombie_stop);
1598 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1599 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);