1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 struct list_head obd_zombie_imports;
60 struct list_head obd_zombie_exports;
61 spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65 static void print_export_data(struct obd_export *exp, const char *status);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
70 * support functions: we could use inter-module communication, but this
71 * is more portable to other OS's
73 static struct obd_device *obd_device_alloc(void)
75 struct obd_device *obd;
77 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79 obd->obd_magic = OBD_DEVICE_MAGIC;
83 EXPORT_SYMBOL(obd_device_alloc);
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
101 struct list_head *tmp;
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 list_for_each(tmp, &obd_types) {
106 type = list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
116 struct obd_type *class_get_type(const char *name)
118 struct obd_type *type = class_search_type(name);
122 const char *modname = name;
123 if (!request_module("%s", modname)) {
124 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
125 type = class_search_type(name);
127 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
133 spin_lock(&type->obd_type_lock);
135 try_module_get(type->typ_dt_ops->o_owner);
136 spin_unlock(&type->obd_type_lock);
141 void class_put_type(struct obd_type *type)
144 spin_lock(&type->obd_type_lock);
146 module_put(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
150 #define CLASS_MAX_NAME 1024
152 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
153 struct lprocfs_vars *vars, const char *name,
154 struct lu_device_type *ldt)
156 struct obd_type *type;
161 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
163 if (class_search_type(name)) {
164 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
169 OBD_ALLOC(type, sizeof(*type));
173 OBD_ALLOC_PTR(type->typ_dt_ops);
174 OBD_ALLOC_PTR(type->typ_md_ops);
175 OBD_ALLOC(type->typ_name, strlen(name) + 1);
177 if (type->typ_dt_ops == NULL ||
178 type->typ_md_ops == NULL ||
179 type->typ_name == NULL)
182 *(type->typ_dt_ops) = *dt_ops;
183 /* md_ops is optional */
185 *(type->typ_md_ops) = *md_ops;
186 strcpy(type->typ_name, name);
187 spin_lock_init(&type->obd_type_lock);
190 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
192 if (IS_ERR(type->typ_procroot)) {
193 rc = PTR_ERR(type->typ_procroot);
194 type->typ_procroot = NULL;
200 rc = lu_device_type_init(ldt);
205 spin_lock(&obd_types_lock);
206 list_add(&type->typ_chain, &obd_types);
207 spin_unlock(&obd_types_lock);
212 if (type->typ_name != NULL)
213 OBD_FREE(type->typ_name, strlen(name) + 1);
214 if (type->typ_md_ops != NULL)
215 OBD_FREE_PTR(type->typ_md_ops);
216 if (type->typ_dt_ops != NULL)
217 OBD_FREE_PTR(type->typ_dt_ops);
218 OBD_FREE(type, sizeof(*type));
222 int class_unregister_type(const char *name)
224 struct obd_type *type = class_search_type(name);
228 CERROR("unknown obd type\n");
232 if (type->typ_refcnt) {
233 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
234 /* This is a bad situation, let's make the best of it */
235 /* Remove ops, but leave the name for debugging */
236 OBD_FREE_PTR(type->typ_dt_ops);
237 OBD_FREE_PTR(type->typ_md_ops);
241 if (type->typ_procroot) {
242 lprocfs_remove(&type->typ_procroot);
246 lu_device_type_fini(type->typ_lu);
248 spin_lock(&obd_types_lock);
249 list_del(&type->typ_chain);
250 spin_unlock(&obd_types_lock);
251 OBD_FREE(type->typ_name, strlen(name) + 1);
252 if (type->typ_dt_ops != NULL)
253 OBD_FREE_PTR(type->typ_dt_ops);
254 if (type->typ_md_ops != NULL)
255 OBD_FREE_PTR(type->typ_md_ops);
256 OBD_FREE(type, sizeof(*type));
258 } /* class_unregister_type */
261 * Create a new obd device.
263 * Find an empty slot in ::obd_devs[], create a new obd device in it.
265 * \param[in] type_name obd device type string.
266 * \param[in] name obd device name.
268 * \retval NULL if create fails, otherwise return the obd device
271 struct obd_device *class_newdev(const char *type_name, const char *name)
273 struct obd_device *result = NULL;
274 struct obd_device *newdev;
275 struct obd_type *type = NULL;
277 int new_obd_minor = 0;
279 if (strlen(name) >= MAX_OBD_NAME) {
280 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
281 RETURN(ERR_PTR(-EINVAL));
284 type = class_get_type(type_name);
286 CERROR("OBD: unknown type: %s\n", type_name);
287 RETURN(ERR_PTR(-ENODEV));
290 newdev = obd_device_alloc();
291 if (newdev == NULL) {
292 class_put_type(type);
293 RETURN(ERR_PTR(-ENOMEM));
295 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
297 spin_lock(&obd_dev_lock);
298 for (i = 0; i < class_devno_max(); i++) {
299 struct obd_device *obd = class_num2obd(i);
300 if (obd && obd->obd_name &&
301 (strcmp(name, obd->obd_name) == 0)) {
302 CERROR("Device %s already exists, won't add\n", name);
304 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
305 "%p obd_magic %08x != %08x\n", result,
306 result->obd_magic, OBD_DEVICE_MAGIC);
307 LASSERTF(result->obd_minor == new_obd_minor,
308 "%p obd_minor %d != %d\n", result,
309 result->obd_minor, new_obd_minor);
311 obd_devs[result->obd_minor] = NULL;
312 result->obd_name[0]='\0';
314 result = ERR_PTR(-EEXIST);
317 if (!result && !obd) {
319 result->obd_minor = i;
321 result->obd_type = type;
322 strncpy(result->obd_name, name,
323 sizeof(result->obd_name) - 1);
324 obd_devs[i] = result;
327 spin_unlock(&obd_dev_lock);
329 if (result == NULL && i >= class_devno_max()) {
330 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
332 result = ERR_PTR(-EOVERFLOW);
335 if (IS_ERR(result)) {
336 obd_device_free(newdev);
337 class_put_type(type);
339 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
340 result->obd_name, result);
345 void class_release_dev(struct obd_device *obd)
347 struct obd_type *obd_type = obd->obd_type;
349 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
350 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
351 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
352 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
353 LASSERT(obd_type != NULL);
355 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
356 obd->obd_name,obd->obd_type->typ_name);
358 spin_lock(&obd_dev_lock);
359 obd_devs[obd->obd_minor] = NULL;
360 spin_unlock(&obd_dev_lock);
361 obd_device_free(obd);
363 class_put_type(obd_type);
366 int class_name2dev(const char *name)
373 spin_lock(&obd_dev_lock);
374 for (i = 0; i < class_devno_max(); i++) {
375 struct obd_device *obd = class_num2obd(i);
376 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
377 /* Make sure we finished attaching before we give
378 out any references */
379 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
380 if (obd->obd_attached) {
381 spin_unlock(&obd_dev_lock);
387 spin_unlock(&obd_dev_lock);
392 struct obd_device *class_name2obd(const char *name)
394 int dev = class_name2dev(name);
396 if (dev < 0 || dev > class_devno_max())
398 return class_num2obd(dev);
401 int class_uuid2dev(struct obd_uuid *uuid)
405 spin_lock(&obd_dev_lock);
406 for (i = 0; i < class_devno_max(); i++) {
407 struct obd_device *obd = class_num2obd(i);
408 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
409 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
410 spin_unlock(&obd_dev_lock);
414 spin_unlock(&obd_dev_lock);
419 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
421 int dev = class_uuid2dev(uuid);
424 return class_num2obd(dev);
428 * Get obd device from ::obd_devs[]
430 * \param num [in] array index
432 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
433 * otherwise return the obd device there.
435 struct obd_device *class_num2obd(int num)
437 struct obd_device *obd = NULL;
439 if (num < class_devno_max()) {
444 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
445 "%p obd_magic %08x != %08x\n",
446 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
447 LASSERTF(obd->obd_minor == num,
448 "%p obd_minor %0d != %0d\n",
449 obd, obd->obd_minor, num);
455 void class_obd_list(void)
460 spin_lock(&obd_dev_lock);
461 for (i = 0; i < class_devno_max(); i++) {
462 struct obd_device *obd = class_num2obd(i);
465 if (obd->obd_stopping)
467 else if (obd->obd_set_up)
469 else if (obd->obd_attached)
473 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
474 i, status, obd->obd_type->typ_name,
475 obd->obd_name, obd->obd_uuid.uuid,
476 atomic_read(&obd->obd_refcount));
478 spin_unlock(&obd_dev_lock);
482 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
483 specified, then only the client with that uuid is returned,
484 otherwise any client connected to the tgt is returned. */
485 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
486 const char * typ_name,
487 struct obd_uuid *grp_uuid)
491 spin_lock(&obd_dev_lock);
492 for (i = 0; i < class_devno_max(); i++) {
493 struct obd_device *obd = class_num2obd(i);
496 if ((strncmp(obd->obd_type->typ_name, typ_name,
497 strlen(typ_name)) == 0)) {
498 if (obd_uuid_equals(tgt_uuid,
499 &obd->u.cli.cl_target_uuid) &&
500 ((grp_uuid)? obd_uuid_equals(grp_uuid,
501 &obd->obd_uuid) : 1)) {
502 spin_unlock(&obd_dev_lock);
507 spin_unlock(&obd_dev_lock);
512 /* Iterate the obd_device list looking devices have grp_uuid. Start
513 searching at *next, and if a device is found, the next index to look
514 at is saved in *next. If next is NULL, then the first matching device
515 will always be returned. */
516 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
522 else if (*next >= 0 && *next < class_devno_max())
527 spin_lock(&obd_dev_lock);
528 for (; i < class_devno_max(); i++) {
529 struct obd_device *obd = class_num2obd(i);
532 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
535 spin_unlock(&obd_dev_lock);
539 spin_unlock(&obd_dev_lock);
545 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
546 * adjust sptlrpc settings accordingly.
548 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
550 struct obd_device *obd;
554 LASSERT(namelen > 0);
556 spin_lock(&obd_dev_lock);
557 for (i = 0; i < class_devno_max(); i++) {
558 obd = class_num2obd(i);
560 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
563 /* only notify mdc, osc, mdt, ost */
564 type = obd->obd_type->typ_name;
565 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
566 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
567 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
568 strcmp(type, LUSTRE_OST_NAME) != 0)
571 if (strncmp(obd->obd_name, fsname, namelen))
574 class_incref(obd, __FUNCTION__, obd);
575 spin_unlock(&obd_dev_lock);
576 rc2 = obd_set_info_async(obd->obd_self_export,
577 sizeof(KEY_SPTLRPC_CONF),
578 KEY_SPTLRPC_CONF, 0, NULL, NULL);
580 class_decref(obd, __FUNCTION__, obd);
581 spin_lock(&obd_dev_lock);
583 spin_unlock(&obd_dev_lock);
586 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
588 void obd_cleanup_caches(void)
593 if (obd_device_cachep) {
594 rc = cfs_mem_cache_destroy(obd_device_cachep);
595 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
596 obd_device_cachep = NULL;
599 rc = cfs_mem_cache_destroy(obdo_cachep);
600 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
604 rc = cfs_mem_cache_destroy(import_cachep);
605 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
606 import_cachep = NULL;
609 rc = cfs_mem_cache_destroy(capa_cachep);
610 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
616 int obd_init_caches(void)
620 LASSERT(obd_device_cachep == NULL);
621 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
622 sizeof(struct obd_device),
624 if (!obd_device_cachep)
627 LASSERT(obdo_cachep == NULL);
628 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
633 LASSERT(import_cachep == NULL);
634 import_cachep = cfs_mem_cache_create("ll_import_cache",
635 sizeof(struct obd_import),
640 LASSERT(capa_cachep == NULL);
641 capa_cachep = cfs_mem_cache_create("capa_cache",
642 sizeof(struct obd_capa), 0, 0);
648 obd_cleanup_caches();
653 /* map connection to client */
654 struct obd_export *class_conn2export(struct lustre_handle *conn)
656 struct obd_export *export;
660 CDEBUG(D_CACHE, "looking for null handle\n");
664 if (conn->cookie == -1) { /* this means assign a new connection */
665 CDEBUG(D_CACHE, "want a new connection\n");
669 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
670 export = class_handle2object(conn->cookie);
674 struct obd_device *class_exp2obd(struct obd_export *exp)
681 struct obd_device *class_conn2obd(struct lustre_handle *conn)
683 struct obd_export *export;
684 export = class_conn2export(conn);
686 struct obd_device *obd = export->exp_obd;
687 class_export_put(export);
693 struct obd_import *class_exp2cliimp(struct obd_export *exp)
695 struct obd_device *obd = exp->exp_obd;
698 return obd->u.cli.cl_import;
701 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
703 struct obd_device *obd = class_conn2obd(conn);
706 return obd->u.cli.cl_import;
709 /* Export management functions */
710 static void class_export_destroy(struct obd_export *exp)
712 struct obd_device *obd = exp->exp_obd;
715 LASSERT (atomic_read(&exp->exp_refcount) == 0);
717 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
718 exp->exp_client_uuid.uuid, obd->obd_name);
720 LASSERT(obd != NULL);
722 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
723 if (exp->exp_connection)
724 ptlrpc_put_connection_superhack(exp->exp_connection);
726 LASSERT(list_empty(&exp->exp_outstanding_replies));
727 LASSERT(list_empty(&exp->exp_uncommitted_replies));
728 LASSERT(list_empty(&exp->exp_req_replay_queue));
729 LASSERT(list_empty(&exp->exp_queued_rpc));
730 obd_destroy_export(exp);
731 class_decref(obd, "export", exp);
733 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
737 static void export_handle_addref(void *export)
739 class_export_get(export);
742 struct obd_export *class_export_get(struct obd_export *exp)
744 atomic_inc(&exp->exp_refcount);
745 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
746 atomic_read(&exp->exp_refcount));
749 EXPORT_SYMBOL(class_export_get);
751 void class_export_put(struct obd_export *exp)
753 LASSERT(exp != NULL);
754 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
755 atomic_read(&exp->exp_refcount) - 1);
756 LASSERT(atomic_read(&exp->exp_refcount) > 0);
757 LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
759 if (atomic_dec_and_test(&exp->exp_refcount)) {
760 LASSERT(!list_empty(&exp->exp_obd_chain));
761 CDEBUG(D_IOCTL, "final put %p/%s\n",
762 exp, exp->exp_client_uuid.uuid);
763 obd_zombie_export_add(exp);
766 EXPORT_SYMBOL(class_export_put);
768 /* Creates a new export, adds it to the hash table, and returns a
769 * pointer to it. The refcount is 2: one for the hash reference, and
770 * one for the pointer returned by this function. */
771 struct obd_export *class_new_export(struct obd_device *obd,
772 struct obd_uuid *cluuid)
774 struct obd_export *export;
777 OBD_ALLOC_PTR(export);
779 return ERR_PTR(-ENOMEM);
781 export->exp_conn_cnt = 0;
782 export->exp_lock_hash = NULL;
783 atomic_set(&export->exp_refcount, 2);
784 atomic_set(&export->exp_rpc_count, 0);
785 atomic_set(&export->exp_cb_count, 0);
786 atomic_set(&export->exp_locks_count, 0);
787 atomic_set(&export->exp_replay_count, 0);
788 export->exp_obd = obd;
789 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
790 spin_lock_init(&export->exp_uncommitted_replies_lock);
791 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
792 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
793 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
794 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
795 class_handle_hash(&export->exp_handle, export_handle_addref);
796 export->exp_last_request_time = cfs_time_current_sec();
797 spin_lock_init(&export->exp_lock);
798 INIT_HLIST_NODE(&export->exp_uuid_hash);
799 INIT_HLIST_NODE(&export->exp_nid_hash);
801 export->exp_sp_peer = LUSTRE_SP_ANY;
802 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
803 export->exp_client_uuid = *cluuid;
804 obd_init_export(export);
806 spin_lock(&obd->obd_dev_lock);
807 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
808 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
809 &export->exp_uuid_hash);
811 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
812 obd->obd_name, cluuid->uuid, rc);
813 spin_unlock(&obd->obd_dev_lock);
814 class_handle_unhash(&export->exp_handle);
815 OBD_FREE_PTR(export);
816 return ERR_PTR(-EALREADY);
820 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
821 class_incref(obd, "export", export);
822 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
823 list_add_tail(&export->exp_obd_chain_timed,
824 &export->exp_obd->obd_exports_timed);
825 export->exp_obd->obd_num_exports++;
826 spin_unlock(&obd->obd_dev_lock);
830 EXPORT_SYMBOL(class_new_export);
832 void class_unlink_export(struct obd_export *exp)
834 class_handle_unhash(&exp->exp_handle);
836 spin_lock(&exp->exp_obd->obd_dev_lock);
837 /* delete an uuid-export hashitem from hashtables */
838 if (!hlist_unhashed(&exp->exp_uuid_hash))
839 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
840 &exp->exp_client_uuid,
841 &exp->exp_uuid_hash);
843 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
844 list_del_init(&exp->exp_obd_chain_timed);
845 exp->exp_obd->obd_num_exports--;
846 spin_unlock(&exp->exp_obd->obd_dev_lock);
847 class_export_put(exp);
849 EXPORT_SYMBOL(class_unlink_export);
851 /* Import management functions */
852 void class_import_destroy(struct obd_import *imp)
856 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
857 imp->imp_obd->obd_name);
859 LASSERT(atomic_read(&imp->imp_refcount) == 0);
861 ptlrpc_put_connection_superhack(imp->imp_connection);
863 while (!list_empty(&imp->imp_conn_list)) {
864 struct obd_import_conn *imp_conn;
866 imp_conn = list_entry(imp->imp_conn_list.next,
867 struct obd_import_conn, oic_item);
868 list_del_init(&imp_conn->oic_item);
869 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
870 OBD_FREE(imp_conn, sizeof(*imp_conn));
873 LASSERT(imp->imp_sec == NULL);
874 class_decref(imp->imp_obd, "import", imp);
875 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
879 static void import_handle_addref(void *import)
881 class_import_get(import);
884 struct obd_import *class_import_get(struct obd_import *import)
886 LASSERT(atomic_read(&import->imp_refcount) >= 0);
887 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
888 atomic_inc(&import->imp_refcount);
889 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
890 atomic_read(&import->imp_refcount),
891 import->imp_obd->obd_name);
894 EXPORT_SYMBOL(class_import_get);
896 void class_import_put(struct obd_import *imp)
900 LASSERT(atomic_read(&imp->imp_refcount) > 0);
901 LASSERT(atomic_read(&imp->imp_refcount) < 0x5a5a5a);
902 LASSERT(list_empty(&imp->imp_zombie_chain));
904 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
905 atomic_read(&imp->imp_refcount) - 1,
906 imp->imp_obd->obd_name);
908 if (atomic_dec_and_test(&imp->imp_refcount)) {
909 CDEBUG(D_INFO, "final put import %p\n", imp);
910 obd_zombie_import_add(imp);
915 EXPORT_SYMBOL(class_import_put);
917 static void init_imp_at(struct imp_at *at) {
919 at_init(&at->iat_net_latency, 0, 0);
920 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
921 /* max service estimates are tracked on the server side, so
922 don't use the AT history here, just use the last reported
923 val. (But keep hist for proc histogram, worst_ever) */
924 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
929 struct obd_import *class_new_import(struct obd_device *obd)
931 struct obd_import *imp;
933 OBD_ALLOC(imp, sizeof(*imp));
937 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
938 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
939 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
940 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
941 spin_lock_init(&imp->imp_lock);
942 imp->imp_last_success_conn = 0;
943 imp->imp_state = LUSTRE_IMP_NEW;
944 imp->imp_obd = class_incref(obd, "import", imp);
945 sema_init(&imp->imp_sec_mutex, 1);
946 cfs_waitq_init(&imp->imp_recovery_waitq);
948 atomic_set(&imp->imp_refcount, 2);
949 atomic_set(&imp->imp_unregistering, 0);
950 atomic_set(&imp->imp_inflight, 0);
951 atomic_set(&imp->imp_replay_inflight, 0);
952 atomic_set(&imp->imp_inval_count, 0);
953 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
954 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
955 class_handle_hash(&imp->imp_handle, import_handle_addref);
956 init_imp_at(&imp->imp_at);
958 /* the default magic is V2, will be used in connect RPC, and
959 * then adjusted according to the flags in request/reply. */
960 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
964 EXPORT_SYMBOL(class_new_import);
966 void class_destroy_import(struct obd_import *import)
968 LASSERT(import != NULL);
969 LASSERT(import != LP_POISON);
971 class_handle_unhash(&import->imp_handle);
973 spin_lock(&import->imp_lock);
974 import->imp_generation++;
975 spin_unlock(&import->imp_lock);
976 class_import_put(import);
978 EXPORT_SYMBOL(class_destroy_import);
980 /* A connection defines an export context in which preallocation can
981 be managed. This releases the export pointer reference, and returns
982 the export handle, so the export refcount is 1 when this function
984 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
985 struct obd_uuid *cluuid)
987 struct obd_export *export;
988 LASSERT(conn != NULL);
989 LASSERT(obd != NULL);
990 LASSERT(cluuid != NULL);
993 export = class_new_export(obd, cluuid);
995 RETURN(PTR_ERR(export));
997 conn->cookie = export->exp_handle.h_cookie;
998 class_export_put(export);
1000 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1001 cluuid->uuid, conn->cookie);
1004 EXPORT_SYMBOL(class_connect);
1006 /* if export is involved in recovery then clean up related things */
1007 void class_export_recovery_cleanup(struct obd_export *exp)
1009 struct obd_device *obd = exp->exp_obd;
1011 spin_lock_bh(&obd->obd_processing_task_lock);
1012 if (exp->exp_delayed)
1013 obd->obd_delayed_clients--;
1014 if (obd->obd_recovering && exp->exp_in_recovery) {
1015 spin_lock(&exp->exp_lock);
1016 exp->exp_in_recovery = 0;
1017 spin_unlock(&exp->exp_lock);
1018 LASSERT(obd->obd_connected_clients);
1019 obd->obd_connected_clients--;
1021 /** Cleanup req replay fields */
1022 if (exp->exp_req_replay_needed) {
1023 spin_lock(&exp->exp_lock);
1024 exp->exp_req_replay_needed = 0;
1025 spin_unlock(&exp->exp_lock);
1026 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1027 atomic_dec(&obd->obd_req_replay_clients);
1029 /** Cleanup lock replay data */
1030 if (exp->exp_lock_replay_needed) {
1031 spin_lock(&exp->exp_lock);
1032 exp->exp_lock_replay_needed = 0;
1033 spin_unlock(&exp->exp_lock);
1034 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1035 atomic_dec(&obd->obd_lock_replay_clients);
1037 spin_unlock_bh(&obd->obd_processing_task_lock);
1040 /* This function removes 1-3 references from the export:
1041 * 1 - for export pointer passed
1042 * and if disconnect really need
1043 * 2 - removing from hash
1044 * 3 - in client_unlink_export
1045 * The export pointer passed to this function can destroyed */
1046 int class_disconnect(struct obd_export *export)
1048 int already_disconnected;
1051 if (export == NULL) {
1053 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1057 spin_lock(&export->exp_lock);
1058 already_disconnected = export->exp_disconnected;
1059 export->exp_disconnected = 1;
1060 spin_unlock(&export->exp_lock);
1062 /* class_cleanup(), abort_recovery(), and class_fail_export()
1063 * all end up in here, and if any of them race we shouldn't
1064 * call extra class_export_puts(). */
1065 if (already_disconnected) {
1066 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1067 GOTO(no_disconn, already_disconnected);
1070 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1071 export->exp_handle.h_cookie);
1073 if (!hlist_unhashed(&export->exp_nid_hash))
1074 lustre_hash_del(export->exp_obd->obd_nid_hash,
1075 &export->exp_connection->c_peer.nid,
1076 &export->exp_nid_hash);
1078 class_export_recovery_cleanup(export);
1079 class_unlink_export(export);
1081 class_export_put(export);
1085 static void class_disconnect_export_list(struct list_head *list,
1086 enum obd_option flags)
1089 struct obd_export *exp;
1092 /* It's possible that an export may disconnect itself, but
1093 * nothing else will be added to this list. */
1094 while (!list_empty(list)) {
1095 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1096 /* need for safe call CDEBUG after obd_disconnect */
1097 class_export_get(exp);
1099 spin_lock(&exp->exp_lock);
1100 exp->exp_flags = flags;
1101 spin_unlock(&exp->exp_lock);
1103 if (obd_uuid_equals(&exp->exp_client_uuid,
1104 &exp->exp_obd->obd_uuid)) {
1106 "exp %p export uuid == obd uuid, don't discon\n",
1108 /* Need to delete this now so we don't end up pointing
1109 * to work_list later when this export is cleaned up. */
1110 list_del_init(&exp->exp_obd_chain);
1111 class_export_put(exp);
1115 class_export_get(exp);
1116 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1117 "last request at "CFS_TIME_T"\n",
1118 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1119 exp, exp->exp_last_request_time);
1120 /* release one export reference anyway */
1121 rc = obd_disconnect(exp);
1123 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1124 obd_export_nid2str(exp), exp, rc);
1125 class_export_put(exp);
1130 void class_disconnect_exports(struct obd_device *obd)
1132 struct list_head work_list;
1135 /* Move all of the exports from obd_exports to a work list, en masse. */
1136 CFS_INIT_LIST_HEAD(&work_list);
1137 spin_lock(&obd->obd_dev_lock);
1138 list_splice_init(&obd->obd_exports, &work_list);
1139 list_splice_init(&obd->obd_delayed_exports, &work_list);
1140 spin_unlock(&obd->obd_dev_lock);
1142 if (!list_empty(&work_list)) {
1143 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1144 "disconnecting them\n", obd->obd_minor, obd);
1145 class_disconnect_export_list(&work_list,
1146 exp_flags_from_obd(obd));
1148 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1149 obd->obd_minor, obd);
1152 EXPORT_SYMBOL(class_disconnect_exports);
1154 /* Remove exports that have not completed recovery.
1156 void class_disconnect_stale_exports(struct obd_device *obd,
1157 int (*test_export)(struct obd_export *))
1159 struct list_head work_list;
1160 struct list_head *pos, *n;
1161 struct obd_export *exp;
1165 CFS_INIT_LIST_HEAD(&work_list);
1166 spin_lock(&obd->obd_dev_lock);
1167 list_for_each_safe(pos, n, &obd->obd_exports) {
1168 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1169 if (test_export(exp))
1172 /* don't count self-export as client */
1173 if (obd_uuid_equals(&exp->exp_client_uuid,
1174 &exp->exp_obd->obd_uuid))
1177 list_move(&exp->exp_obd_chain, &work_list);
1179 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1180 obd->obd_name, exp->exp_client_uuid.uuid,
1181 exp->exp_connection == NULL ? "<unknown>" :
1182 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1183 print_export_data(exp, "EVICTING");
1185 spin_unlock(&obd->obd_dev_lock);
1188 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1189 obd->obd_name, evicted);
1190 obd->obd_stale_clients += evicted;
1192 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1193 OBD_OPT_ABORT_RECOV);
1196 EXPORT_SYMBOL(class_disconnect_stale_exports);
1198 void class_fail_export(struct obd_export *exp)
1200 int rc, already_failed;
1202 spin_lock(&exp->exp_lock);
1203 already_failed = exp->exp_failed;
1204 exp->exp_failed = 1;
1205 spin_unlock(&exp->exp_lock);
1207 if (already_failed) {
1208 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1209 exp, exp->exp_client_uuid.uuid);
1213 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1214 exp, exp->exp_client_uuid.uuid);
1216 if (obd_dump_on_timeout)
1217 libcfs_debug_dumplog();
1219 /* Most callers into obd_disconnect are removing their own reference
1220 * (request, for example) in addition to the one from the hash table.
1221 * We don't have such a reference here, so make one. */
1222 class_export_get(exp);
1223 rc = obd_disconnect(exp);
1225 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1227 CDEBUG(D_HA, "disconnected export %p/%s\n",
1228 exp, exp->exp_client_uuid.uuid);
1230 EXPORT_SYMBOL(class_fail_export);
1232 char *obd_export_nid2str(struct obd_export *exp)
1234 if (exp->exp_connection != NULL)
1235 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1239 EXPORT_SYMBOL(obd_export_nid2str);
1241 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1243 struct obd_export *doomed_exp = NULL;
1244 int exports_evicted = 0;
1246 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1249 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1250 if (doomed_exp == NULL)
1253 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1254 "nid %s found, wanted nid %s, requested nid %s\n",
1255 obd_export_nid2str(doomed_exp),
1256 libcfs_nid2str(nid_key), nid);
1257 LASSERTF(doomed_exp != obd->obd_self_export,
1258 "self-export is hashed by NID?\n");
1260 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1261 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1263 class_fail_export(doomed_exp);
1264 class_export_put(doomed_exp);
1267 if (!exports_evicted)
1268 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1269 obd->obd_name, nid);
1270 return exports_evicted;
1272 EXPORT_SYMBOL(obd_export_evict_by_nid);
1274 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1276 struct obd_export *doomed_exp = NULL;
1277 struct obd_uuid doomed_uuid;
1278 int exports_evicted = 0;
1280 obd_str2uuid(&doomed_uuid, uuid);
1281 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1282 CERROR("%s: can't evict myself\n", obd->obd_name);
1283 return exports_evicted;
1286 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1288 if (doomed_exp == NULL) {
1289 CERROR("%s: can't disconnect %s: no exports found\n",
1290 obd->obd_name, uuid);
1292 CWARN("%s: evicting %s at adminstrative request\n",
1293 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1294 class_fail_export(doomed_exp);
1295 class_export_put(doomed_exp);
1299 return exports_evicted;
1301 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1303 static void print_export_data(struct obd_export *exp, const char *status)
1305 struct ptlrpc_reply_state *rs;
1306 struct ptlrpc_reply_state *first_reply = NULL;
1309 spin_lock(&exp->exp_lock);
1310 list_for_each_entry (rs, &exp->exp_outstanding_replies, rs_exp_list) {
1315 spin_unlock(&exp->exp_lock);
1317 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1318 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1319 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1320 atomic_read(&exp->exp_rpc_count),
1321 atomic_read(&exp->exp_cb_count),
1322 atomic_read(&exp->exp_locks_count),
1323 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1324 nreplies, first_reply, nreplies > 3 ? "..." : "",
1325 exp->exp_last_committed);
1328 void dump_exports(struct obd_device *obd)
1330 struct obd_export *exp;
1332 spin_lock(&obd->obd_dev_lock);
1333 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1334 print_export_data(exp, "ACTIVE");
1335 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1336 print_export_data(exp, "UNLINKED");
1337 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1338 print_export_data(exp, "DELAYED");
1339 spin_unlock(&obd->obd_dev_lock);
1340 spin_lock(&obd_zombie_impexp_lock);
1341 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1342 print_export_data(exp, "ZOMBIE");
1343 spin_unlock(&obd_zombie_impexp_lock);
1345 EXPORT_SYMBOL(dump_exports);
1347 void obd_exports_barrier(struct obd_device *obd)
1350 LASSERT(list_empty(&obd->obd_exports));
1351 spin_lock(&obd->obd_dev_lock);
1352 while (!list_empty(&obd->obd_unlinked_exports)) {
1353 spin_unlock(&obd->obd_dev_lock);
1354 cfs_schedule_timeout(CFS_TASK_UNINT, cfs_time_seconds(waited));
1355 if (waited > 5 && IS_PO2(waited)) {
1356 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1357 "more than %d seconds. "
1358 "The obd refcount = %d. Is it stuck?\n",
1359 obd->obd_name, waited,
1360 atomic_read(&obd->obd_refcount));
1364 spin_lock(&obd->obd_dev_lock);
1366 spin_unlock(&obd->obd_dev_lock);
1368 EXPORT_SYMBOL(obd_exports_barrier);
1371 * kill zombie imports and exports
1373 void obd_zombie_impexp_cull(void)
1375 struct obd_import *import;
1376 struct obd_export *export;
1380 spin_lock(&obd_zombie_impexp_lock);
1383 if (!list_empty(&obd_zombie_imports)) {
1384 import = list_entry(obd_zombie_imports.next,
1387 list_del_init(&import->imp_zombie_chain);
1391 if (!list_empty(&obd_zombie_exports)) {
1392 export = list_entry(obd_zombie_exports.next,
1395 list_del_init(&export->exp_obd_chain);
1398 spin_unlock(&obd_zombie_impexp_lock);
1401 class_import_destroy(import);
1404 class_export_destroy(export);
1406 } while (import != NULL || export != NULL);
1410 static struct completion obd_zombie_start;
1411 static struct completion obd_zombie_stop;
1412 static unsigned long obd_zombie_flags;
1413 static cfs_waitq_t obd_zombie_waitq;
1414 static pid_t obd_zombie_pid;
1417 OBD_ZOMBIE_STOP = 1 << 1
1421 * check for work for kill zombie import/export thread.
1423 static int obd_zombie_impexp_check(void *arg)
1427 spin_lock(&obd_zombie_impexp_lock);
1428 rc = list_empty(&obd_zombie_imports) &&
1429 list_empty(&obd_zombie_exports) &&
1430 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1432 spin_unlock(&obd_zombie_impexp_lock);
1438 * Add export to the obd_zombe thread and notify it.
1440 static void obd_zombie_export_add(struct obd_export *exp) {
1441 spin_lock(&exp->exp_obd->obd_dev_lock);
1442 LASSERT(!list_empty(&exp->exp_obd_chain));
1443 list_del_init(&exp->exp_obd_chain);
1444 spin_unlock(&exp->exp_obd->obd_dev_lock);
1445 spin_lock(&obd_zombie_impexp_lock);
1446 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1447 spin_unlock(&obd_zombie_impexp_lock);
1449 if (obd_zombie_impexp_notify != NULL)
1450 obd_zombie_impexp_notify();
1454 * Add import to the obd_zombe thread and notify it.
1456 static void obd_zombie_import_add(struct obd_import *imp) {
1457 LASSERT(imp->imp_sec == NULL);
1458 spin_lock(&obd_zombie_impexp_lock);
1459 LASSERT(list_empty(&imp->imp_zombie_chain));
1460 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1461 spin_unlock(&obd_zombie_impexp_lock);
1463 if (obd_zombie_impexp_notify != NULL)
1464 obd_zombie_impexp_notify();
1468 * notify import/export destroy thread about new zombie.
1470 static void obd_zombie_impexp_notify(void)
1472 cfs_waitq_signal(&obd_zombie_waitq);
1476 * check whether obd_zombie is idle
1478 static int obd_zombie_is_idle(void)
1482 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1483 spin_lock(&obd_zombie_impexp_lock);
1484 rc = list_empty(&obd_zombie_imports) &&
1485 list_empty(&obd_zombie_exports);
1486 spin_unlock(&obd_zombie_impexp_lock);
1491 * wait when obd_zombie import/export queues become empty
1493 void obd_zombie_barrier(void)
1495 struct l_wait_info lwi = { 0 };
1497 if (obd_zombie_pid == cfs_curproc_pid())
1498 /* don't wait for myself */
1500 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1502 EXPORT_SYMBOL(obd_zombie_barrier);
1507 * destroy zombie export/import thread.
1509 static int obd_zombie_impexp_thread(void *unused)
1513 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1514 complete(&obd_zombie_start);
1518 complete(&obd_zombie_start);
1520 obd_zombie_pid = cfs_curproc_pid();
1522 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1523 struct l_wait_info lwi = { 0 };
1525 l_wait_event(obd_zombie_waitq,
1526 !obd_zombie_impexp_check(NULL), &lwi);
1527 obd_zombie_impexp_cull();
1530 * Notify obd_zombie_barrier callers that queues
1533 cfs_waitq_signal(&obd_zombie_waitq);
1536 complete(&obd_zombie_stop);
1541 #else /* ! KERNEL */
1543 static atomic_t zombie_recur = ATOMIC_INIT(0);
1544 static void *obd_zombie_impexp_work_cb;
1545 static void *obd_zombie_impexp_idle_cb;
1547 int obd_zombie_impexp_kill(void *arg)
1551 if (atomic_inc_return(&zombie_recur) == 1) {
1552 obd_zombie_impexp_cull();
1555 atomic_dec(&zombie_recur);
1562 * start destroy zombie import/export thread
1564 int obd_zombie_impexp_init(void)
1568 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1569 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1570 spin_lock_init(&obd_zombie_impexp_lock);
1571 init_completion(&obd_zombie_start);
1572 init_completion(&obd_zombie_stop);
1573 cfs_waitq_init(&obd_zombie_waitq);
1577 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1581 wait_for_completion(&obd_zombie_start);
1584 obd_zombie_impexp_work_cb =
1585 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1586 &obd_zombie_impexp_kill, NULL);
1588 obd_zombie_impexp_idle_cb =
1589 liblustre_register_idle_callback("obd_zombi_impexp_check",
1590 &obd_zombie_impexp_check, NULL);
1596 * stop destroy zombie import/export thread
1598 void obd_zombie_impexp_stop(void)
1600 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1601 obd_zombie_impexp_notify();
1603 wait_for_completion(&obd_zombie_stop);
1605 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1606 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);