1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50 #include <lustre_export.h>
52 extern struct list_head obd_types;
53 spinlock_t obd_types_lock;
55 cfs_mem_cache_t *obd_device_cachep;
56 cfs_mem_cache_t *obdo_cachep;
57 EXPORT_SYMBOL(obdo_cachep);
58 cfs_mem_cache_t *import_cachep;
60 struct list_head obd_zombie_imports;
61 struct list_head obd_zombie_exports;
62 spinlock_t obd_zombie_impexp_lock;
63 static void obd_zombie_impexp_notify(void);
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 * support functions: we could use inter-module communication, but this
69 * is more portable to other OS's
71 static struct obd_device *obd_device_alloc(void)
73 struct obd_device *obd;
75 OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
77 obd->obd_magic = OBD_DEVICE_MAGIC;
81 EXPORT_SYMBOL(obd_device_alloc);
83 static void obd_device_free(struct obd_device *obd)
86 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic "
87 "%08x != %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88 if (obd->obd_namespace != NULL) {
89 CERROR("obd %p: namespace %p was not properly cleaned up "
91 obd, obd->obd_namespace, obd->obd_force);
94 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 EXPORT_SYMBOL(obd_device_free);
98 struct obd_type *class_search_type(const char *name)
100 struct list_head *tmp;
101 struct obd_type *type;
103 spin_lock(&obd_types_lock);
104 list_for_each(tmp, &obd_types) {
105 type = list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 spin_unlock(&obd_types_lock);
111 spin_unlock(&obd_types_lock);
115 struct obd_type *class_get_type(const char *name)
117 struct obd_type *type = class_search_type(name);
121 const char *modname = name;
122 if (strcmp(modname, LUSTRE_MDT_NAME) == 0)
123 modname = LUSTRE_MDS_NAME;
124 if (!request_module("%s", modname)) {
125 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126 type = class_search_type(name);
128 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134 spin_lock(&type->obd_type_lock);
136 try_module_get(type->typ_ops->o_owner);
137 spin_unlock(&type->obd_type_lock);
142 void class_put_type(struct obd_type *type)
145 spin_lock(&type->obd_type_lock);
147 module_put(type->typ_ops->o_owner);
148 spin_unlock(&type->obd_type_lock);
151 int class_register_type(struct obd_ops *ops, struct lprocfs_vars *vars,
154 struct obd_type *type;
158 LASSERT(strnlen(name, 1024) < 1024); /* sanity check */
160 if (class_search_type(name)) {
161 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166 OBD_ALLOC(type, sizeof(*type));
170 OBD_ALLOC(type->typ_ops, sizeof(*type->typ_ops));
171 OBD_ALLOC(type->typ_name, strlen(name) + 1);
172 if (type->typ_ops == NULL || type->typ_name == NULL)
175 *(type->typ_ops) = *ops;
176 strcpy(type->typ_name, name);
177 spin_lock_init(&type->obd_type_lock);
180 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
182 if (IS_ERR(type->typ_procroot)) {
183 rc = PTR_ERR(type->typ_procroot);
184 type->typ_procroot = NULL;
189 spin_lock(&obd_types_lock);
190 list_add(&type->typ_chain, &obd_types);
191 spin_unlock(&obd_types_lock);
196 if (type->typ_name != NULL)
197 OBD_FREE(type->typ_name, strlen(name) + 1);
198 if (type->typ_ops != NULL)
199 OBD_FREE (type->typ_ops, sizeof (*type->typ_ops));
200 OBD_FREE(type, sizeof(*type));
204 int class_unregister_type(const char *name)
206 struct obd_type *type = class_search_type(name);
210 CERROR("unknown obd type\n");
214 if (type->typ_refcnt) {
215 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
216 /* This is a bad situation, let's make the best of it */
217 /* Remove ops, but leave the name for debugging */
218 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
222 if (type->typ_procroot)
223 lprocfs_remove(&type->typ_procroot);
225 spin_lock(&obd_types_lock);
226 list_del(&type->typ_chain);
227 spin_unlock(&obd_types_lock);
228 OBD_FREE(type->typ_name, strlen(name) + 1);
229 if (type->typ_ops != NULL)
230 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
231 OBD_FREE(type, sizeof(*type));
233 } /* class_unregister_type */
236 * Create a new obd device.
238 * Find an empty slot in ::obd_devs[], create a new obd device in it.
240 * \param typename [in] obd device type string.
241 * \param name [in] obd device name.
243 * \retval NULL if create fails, otherwise return the obd device
246 struct obd_device *class_newdev(const char *type_name, const char *name)
248 struct obd_device *result = NULL;
249 struct obd_device *newdev;
250 struct obd_type *type = NULL;
252 int new_obd_minor = 0;
254 if (strlen(name) >= MAX_OBD_NAME) {
255 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
256 RETURN(ERR_PTR(-EINVAL));
259 type = class_get_type(type_name);
261 CERROR("OBD: unknown type: %s\n", type_name);
262 RETURN(ERR_PTR(-ENODEV));
265 newdev = obd_device_alloc();
266 if (newdev == NULL) {
267 class_put_type(type);
268 RETURN(ERR_PTR(-ENOMEM));
270 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
272 spin_lock(&obd_dev_lock);
273 for (i = 0; i < class_devno_max(); i++) {
274 struct obd_device *obd = class_num2obd(i);
275 if (obd && obd->obd_name && (strcmp(name, obd->obd_name) == 0)){
276 CERROR("Device %s already exists, won't add\n", name);
278 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
279 "%p obd_magic %08x != %08x\n", result,
280 result->obd_magic, OBD_DEVICE_MAGIC);
281 LASSERTF(result->obd_minor == new_obd_minor,
282 "%p obd_minor %d != %d\n", result,
283 result->obd_minor, new_obd_minor);
285 obd_devs[result->obd_minor] = NULL;
286 result->obd_name[0]='\0';
288 result = ERR_PTR(-EEXIST);
291 if (!result && !obd) {
293 result->obd_minor = i;
295 result->obd_type = type;
296 strncpy(result->obd_name, name,
297 sizeof(result->obd_name) - 1);
298 obd_devs[i] = result;
301 spin_unlock(&obd_dev_lock);
303 if (result == NULL && i >= class_devno_max()) {
304 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
306 result = ERR_PTR(-EOVERFLOW);
309 if (IS_ERR(result)) {
310 obd_device_free(newdev);
311 class_put_type(type);
313 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
314 result->obd_name, result);
319 void class_release_dev(struct obd_device *obd)
321 struct obd_type *obd_type = obd->obd_type;
323 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
324 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
325 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
326 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
327 LASSERT(obd_type != NULL);
329 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
330 obd->obd_name,obd->obd_type->typ_name);
332 spin_lock(&obd_dev_lock);
333 obd_devs[obd->obd_minor] = NULL;
334 spin_unlock(&obd_dev_lock);
335 obd_device_free(obd);
337 class_put_type(obd_type);
340 int class_name2dev(const char *name)
347 spin_lock(&obd_dev_lock);
348 for (i = 0; i < class_devno_max(); i++) {
349 struct obd_device *obd = class_num2obd(i);
350 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
351 /* Make sure we finished attaching before we give
352 out any references */
353 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
354 if (obd->obd_attached) {
355 spin_unlock(&obd_dev_lock);
361 spin_unlock(&obd_dev_lock);
366 struct obd_device *class_name2obd(const char *name)
368 int dev = class_name2dev(name);
370 if (dev < 0 || dev > class_devno_max())
372 return class_num2obd(dev);
375 int class_uuid2dev(struct obd_uuid *uuid)
379 spin_lock(&obd_dev_lock);
380 for (i = 0; i < class_devno_max(); i++) {
381 struct obd_device *obd = class_num2obd(i);
382 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
383 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
384 spin_unlock(&obd_dev_lock);
388 spin_unlock(&obd_dev_lock);
393 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
395 int dev = class_uuid2dev(uuid);
398 return class_num2obd(dev);
402 * Get obd device from ::obd_devs[]
404 * \param num [in] array index
406 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
407 * otherwise return the obd device there.
409 struct obd_device *class_num2obd(int num)
411 struct obd_device *obd = NULL;
413 if (num < class_devno_max()) {
418 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
419 "%p obd_magic %08x != %08x\n",
420 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
421 LASSERTF(obd->obd_minor == num,
422 "%p obd_minor %0d != %0d\n",
423 obd, obd->obd_minor, num);
429 void class_obd_list(void)
434 spin_lock(&obd_dev_lock);
435 for (i = 0; i < class_devno_max(); i++) {
436 struct obd_device *obd = class_num2obd(i);
439 if (obd->obd_stopping)
441 else if (obd->obd_set_up)
443 else if (obd->obd_attached)
447 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
448 i, status, obd->obd_type->typ_name,
449 obd->obd_name, obd->obd_uuid.uuid,
450 atomic_read(&obd->obd_refcount));
452 spin_unlock(&obd_dev_lock);
456 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
457 specified, then only the client with that uuid is returned,
458 otherwise any client connected to the tgt is returned. */
459 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
460 const char * typ_name,
461 struct obd_uuid *grp_uuid)
465 spin_lock(&obd_dev_lock);
466 for (i = 0; i < class_devno_max(); i++) {
467 struct obd_device *obd = class_num2obd(i);
470 if ((strncmp(obd->obd_type->typ_name, typ_name,
471 strlen(typ_name)) == 0)) {
472 if (obd_uuid_equals(tgt_uuid,
473 &obd->u.cli.cl_target_uuid) &&
474 ((grp_uuid)? obd_uuid_equals(grp_uuid,
475 &obd->obd_uuid) : 1)) {
476 spin_unlock(&obd_dev_lock);
481 spin_unlock(&obd_dev_lock);
486 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
487 struct obd_uuid *grp_uuid)
489 struct obd_device *obd;
491 obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
493 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
498 /* Iterate the obd_device list looking devices have grp_uuid. Start
499 searching at *next, and if a device is found, the next index to look
500 at is saved in *next. If next is NULL, then the first matching device
501 will always be returned. */
502 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
508 else if (*next >= 0 && *next < class_devno_max())
513 spin_lock(&obd_dev_lock);
514 for (; i < class_devno_max(); i++) {
515 struct obd_device *obd = class_num2obd(i);
518 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
521 spin_unlock(&obd_dev_lock);
525 spin_unlock(&obd_dev_lock);
531 void obd_cleanup_caches(void)
536 if (obd_device_cachep) {
537 rc = cfs_mem_cache_destroy(obd_device_cachep);
538 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
539 obd_device_cachep = NULL;
542 rc = cfs_mem_cache_destroy(obdo_cachep);
543 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
547 rc = cfs_mem_cache_destroy(import_cachep);
548 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
549 import_cachep = NULL;
554 int obd_init_caches(void)
558 LASSERT(obd_device_cachep == NULL);
559 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
560 sizeof(struct obd_device),
562 if (!obd_device_cachep)
565 LASSERT(obdo_cachep == NULL);
566 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
571 LASSERT(import_cachep == NULL);
572 import_cachep = cfs_mem_cache_create("ll_import_cache",
573 sizeof(struct obd_import),
580 obd_cleanup_caches();
585 /* map connection to client */
586 struct obd_export *class_conn2export(struct lustre_handle *conn)
588 struct obd_export *export;
592 CDEBUG(D_CACHE, "looking for null handle\n");
596 if (conn->cookie == -1) { /* this means assign a new connection */
597 CDEBUG(D_CACHE, "want a new connection\n");
601 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
602 export = class_handle2object(conn->cookie);
606 struct obd_device *class_exp2obd(struct obd_export *exp)
613 struct obd_device *class_conn2obd(struct lustre_handle *conn)
615 struct obd_export *export;
616 export = class_conn2export(conn);
618 struct obd_device *obd = export->exp_obd;
619 class_export_put(export);
625 struct obd_import *class_exp2cliimp(struct obd_export *exp)
627 struct obd_device *obd = exp->exp_obd;
630 return obd->u.cli.cl_import;
633 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
635 struct obd_device *obd = class_conn2obd(conn);
638 return obd->u.cli.cl_import;
641 /* Export management functions */
642 static void export_handle_addref(void *export)
644 class_export_get(export);
647 /* called from mds_commit_cb() in context of journal commit callback
648 * and cannot call any blocking functions. */
649 void __class_export_put(struct obd_export *exp)
651 if (atomic_dec_and_test(&exp->exp_refcount)) {
652 LASSERT (list_empty(&exp->exp_obd_chain));
654 CDEBUG(D_IOCTL, "final put %p/%s\n",
655 exp, exp->exp_client_uuid.uuid);
657 spin_lock(&obd_zombie_impexp_lock);
658 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
659 spin_unlock(&obd_zombie_impexp_lock);
661 obd_zombie_impexp_notify();
664 EXPORT_SYMBOL(__class_export_put);
666 void class_export_destroy(struct obd_export *exp)
668 struct obd_device *obd = exp->exp_obd;
670 LASSERT (atomic_read(&exp->exp_refcount) == 0);
672 CDEBUG(D_IOCTL, "destroying export %p/%s\n", exp,
673 exp->exp_client_uuid.uuid);
675 LASSERT(obd != NULL);
677 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
678 if (exp->exp_connection)
679 ptlrpc_put_connection_superhack(exp->exp_connection);
681 LASSERT(list_empty(&exp->exp_outstanding_replies));
682 LASSERT(list_empty(&exp->exp_uncommitted_replies));
683 LASSERT(list_empty(&exp->exp_req_replay_queue));
684 LASSERT(list_empty(&exp->exp_queued_rpc));
685 obd_destroy_export(exp);
687 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
691 /* Creates a new export, adds it to the hash table, and returns a
692 * pointer to it. The refcount is 2: one for the hash reference, and
693 * one for the pointer returned by this function. */
694 struct obd_export *class_new_export(struct obd_device *obd,
695 struct obd_uuid *cluuid)
697 struct obd_export *export;
700 OBD_ALLOC(export, sizeof(*export));
702 return ERR_PTR(-ENOMEM);
704 export->exp_conn_cnt = 0;
705 export->exp_lock_hash = NULL;
706 atomic_set(&export->exp_refcount, 2);
707 atomic_set(&export->exp_rpc_count, 0);
708 export->exp_obd = obd;
709 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
710 spin_lock_init(&export->exp_uncommitted_replies_lock);
711 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
712 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
713 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
715 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
716 class_handle_hash(&export->exp_handle, export_handle_addref);
717 export->exp_last_request_time = cfs_time_current_sec();
718 spin_lock_init(&export->exp_lock);
719 INIT_HLIST_NODE(&export->exp_uuid_hash);
720 INIT_HLIST_NODE(&export->exp_nid_hash);
722 export->exp_client_uuid = *cluuid;
723 obd_init_export(export);
725 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
726 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
727 &export->exp_uuid_hash);
729 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
730 obd->obd_name, cluuid->uuid, rc);
731 class_handle_unhash(&export->exp_handle);
732 OBD_FREE_PTR(export);
733 return ERR_PTR(-EALREADY);
737 spin_lock(&obd->obd_dev_lock);
738 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
740 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
741 list_add_tail(&export->exp_obd_chain_timed,
742 &export->exp_obd->obd_exports_timed);
743 export->exp_obd->obd_num_exports++;
744 spin_unlock(&obd->obd_dev_lock);
748 EXPORT_SYMBOL(class_new_export);
750 void class_unlink_export(struct obd_export *exp)
752 class_handle_unhash(&exp->exp_handle);
754 spin_lock(&exp->exp_obd->obd_dev_lock);
755 /* delete an uuid-export hashitem from hashtables */
756 if (!hlist_unhashed(&exp->exp_uuid_hash))
757 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
758 &exp->exp_client_uuid,
759 &exp->exp_uuid_hash);
761 list_del_init(&exp->exp_obd_chain);
762 list_del_init(&exp->exp_obd_chain_timed);
763 exp->exp_obd->obd_num_exports--;
764 spin_unlock(&exp->exp_obd->obd_dev_lock);
765 /* Keep these counter valid always */
766 spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
767 if (exp->exp_delayed) {
768 spin_lock(&exp->exp_lock);
769 exp->exp_delayed = 0;
770 spin_unlock(&exp->exp_lock);
771 LASSERT(exp->exp_obd->obd_delayed_clients);
772 exp->exp_obd->obd_delayed_clients--;
773 } else if (exp->exp_replay_needed) {
774 spin_lock(&exp->exp_lock);
775 exp->exp_replay_needed = 0;
776 spin_unlock(&exp->exp_lock);
777 LASSERT(exp->exp_obd->obd_recoverable_clients);
778 exp->exp_obd->obd_recoverable_clients--;
781 if (exp->exp_obd->obd_recovering && exp->exp_in_recovery) {
782 spin_lock(&exp->exp_lock);
783 exp->exp_in_recovery = 0;
784 spin_unlock(&exp->exp_lock);
785 LASSERT(exp->exp_obd->obd_connected_clients);
786 exp->exp_obd->obd_connected_clients--;
788 spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
789 class_export_put(exp);
791 EXPORT_SYMBOL(class_unlink_export);
793 /* Import management functions */
794 static void import_handle_addref(void *import)
796 class_import_get(import);
799 struct obd_import *class_import_get(struct obd_import *import)
801 LASSERT(atomic_read(&import->imp_refcount) >= 0);
802 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
803 atomic_inc(&import->imp_refcount);
804 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
805 atomic_read(&import->imp_refcount),
806 import->imp_obd->obd_name);
809 EXPORT_SYMBOL(class_import_get);
811 void class_import_put(struct obd_import *import)
815 LASSERT(atomic_read(&import->imp_refcount) > 0);
816 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
817 LASSERT(list_empty(&import->imp_zombie_chain));
819 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
820 atomic_read(&import->imp_refcount) - 1,
821 import->imp_obd->obd_name);
823 if (atomic_dec_and_test(&import->imp_refcount)) {
824 CDEBUG(D_INFO, "final put import %p\n", import);
825 spin_lock(&obd_zombie_impexp_lock);
826 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
827 spin_unlock(&obd_zombie_impexp_lock);
829 obd_zombie_impexp_notify();
834 EXPORT_SYMBOL(class_import_put);
836 void class_import_destroy(struct obd_import *import)
840 CDEBUG(D_IOCTL, "destroying import %p\n", import);
842 LASSERT(atomic_read(&import->imp_refcount) == 0);
844 ptlrpc_put_connection_superhack(import->imp_connection);
846 while (!list_empty(&import->imp_conn_list)) {
847 struct obd_import_conn *imp_conn;
849 imp_conn = list_entry(import->imp_conn_list.next,
850 struct obd_import_conn, oic_item);
851 list_del(&imp_conn->oic_item);
852 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
853 OBD_FREE(imp_conn, sizeof(*imp_conn));
856 class_decref(import->imp_obd);
857 OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
861 static void init_imp_at(struct imp_at *at) {
863 at_init(&at->iat_net_latency, 0, 0);
864 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
865 /* max service estimates are tracked on the server side, so
866 don't use the AT history here, just use the last reported
867 val. (But keep hist for proc histogram, worst_ever) */
868 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
873 struct obd_import *class_new_import(struct obd_device *obd)
875 struct obd_import *imp;
877 OBD_ALLOC(imp, sizeof(*imp));
881 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
882 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
883 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
884 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
885 spin_lock_init(&imp->imp_lock);
886 imp->imp_last_success_conn = 0;
887 imp->imp_state = LUSTRE_IMP_NEW;
888 imp->imp_obd = class_incref(obd);
889 cfs_waitq_init(&imp->imp_recovery_waitq);
891 atomic_set(&imp->imp_refcount, 2);
892 atomic_set(&imp->imp_unregistering, 0);
893 atomic_set(&imp->imp_inflight, 0);
894 atomic_set(&imp->imp_replay_inflight, 0);
895 atomic_set(&imp->imp_inval_count, 0);
896 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
897 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
898 class_handle_hash(&imp->imp_handle, import_handle_addref);
899 init_imp_at(&imp->imp_at);
901 /* b1_8 supports both v1 & v2. but HEAD only supports v2.
904 #define HAVE_DEFAULT_V2_CONNECT 1
905 #ifdef HAVE_DEFAULT_V2_CONNECT
906 /* the default magic is V2, will be used in connect RPC, and
907 * then adjusted according to the flags in request/reply. */
908 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
910 /* the default magic is V1, will be used in connect RPC, and
911 * then adjusted according to the flags in request/reply. */
912 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V1;
917 EXPORT_SYMBOL(class_new_import);
919 void class_destroy_import(struct obd_import *import)
921 LASSERT(import != NULL);
922 LASSERT(import != LP_POISON);
924 class_handle_unhash(&import->imp_handle);
926 spin_lock(&import->imp_lock);
927 import->imp_generation++;
928 spin_unlock(&import->imp_lock);
930 class_import_put(import);
932 EXPORT_SYMBOL(class_destroy_import);
934 /* A connection defines an export context in which preallocation can
935 be managed. This releases the export pointer reference, and returns
936 the export handle, so the export refcount is 1 when this function
938 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
939 struct obd_uuid *cluuid)
941 struct obd_export *export;
942 LASSERT(conn != NULL);
943 LASSERT(obd != NULL);
944 LASSERT(cluuid != NULL);
947 export = class_new_export(obd, cluuid);
949 RETURN(PTR_ERR(export));
951 conn->cookie = export->exp_handle.h_cookie;
952 class_export_put(export);
954 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
955 cluuid->uuid, conn->cookie);
958 EXPORT_SYMBOL(class_connect);
960 /* This function removes 1-3 references from the export:
961 * 1 - for export pointer passed
962 * and if disconnect really need
963 * 2 - removing from hash
964 * 3 - in client_unlink_export
965 * The export pointer passed to this function can destroyed */
966 int class_disconnect(struct obd_export *export)
968 int already_disconnected;
971 if (export == NULL) {
973 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
977 spin_lock(&export->exp_lock);
978 already_disconnected = export->exp_disconnected;
979 export->exp_disconnected = 1;
980 spin_unlock(&export->exp_lock);
983 /* class_cleanup(), abort_recovery(), and class_fail_export()
984 * all end up in here, and if any of them race we shouldn't
985 * call extra class_export_puts(). */
986 if (already_disconnected)
987 GOTO(no_disconn, already_disconnected);
989 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
990 export->exp_handle.h_cookie);
993 if (!hlist_unhashed(&export->exp_nid_hash))
994 lustre_hash_del(export->exp_obd->obd_nid_hash,
995 &export->exp_connection->c_peer.nid,
996 &export->exp_nid_hash);
998 class_unlink_export(export);
1001 class_export_put(export);
1005 /* Return non-zero for a fully connected export */
1006 int class_connected_export(struct obd_export *exp)
1010 spin_lock(&exp->exp_lock);
1011 connected = (exp->exp_conn_cnt > 0);
1012 spin_unlock(&exp->exp_lock);
1017 EXPORT_SYMBOL(class_connected_export);
1019 static void class_disconnect_export_list(struct list_head *list,
1020 enum obd_option flags)
1023 struct obd_export *exp;
1026 /* It's possible that an export may disconnect itself, but
1027 * nothing else will be added to this list. */
1028 while (!list_empty(list)) {
1029 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1030 /* need for safe call CDEBUG after obd_disconnect */
1031 class_export_get(exp);
1033 spin_lock(&exp->exp_lock);
1034 exp->exp_flags = flags;
1035 spin_unlock(&exp->exp_lock);
1037 if (obd_uuid_equals(&exp->exp_client_uuid,
1038 &exp->exp_obd->obd_uuid)) {
1040 "exp %p export uuid == obd uuid, don't discon\n",
1042 /* Need to delete this now so we don't end up pointing
1043 * to work_list later when this export is cleaned up. */
1044 list_del_init(&exp->exp_obd_chain);
1045 class_export_put(exp);
1049 class_export_get(exp);
1050 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1051 "last request at %ld\n",
1052 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1053 exp, exp->exp_last_request_time);
1055 /* release one export reference anyway */
1056 rc = obd_disconnect(exp);
1057 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1058 obd_export_nid2str(exp), exp, rc);
1059 class_export_put(exp);
1064 void class_disconnect_exports(struct obd_device *obd)
1066 struct list_head work_list;
1069 /* Move all of the exports from obd_exports to a work list, en masse. */
1070 CFS_INIT_LIST_HEAD(&work_list);
1071 spin_lock(&obd->obd_dev_lock);
1072 list_splice_init(&obd->obd_delayed_exports, &work_list);
1073 list_splice_init(&obd->obd_exports, &work_list);
1074 spin_unlock(&obd->obd_dev_lock);
1076 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1077 "disconnecting them\n", obd->obd_minor, obd);
1078 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd));
1081 EXPORT_SYMBOL(class_disconnect_exports);
1083 /* Remove exports that have not completed recovery. */
1084 void class_disconnect_stale_exports(struct obd_device *obd,
1085 enum obd_option flags)
1087 struct list_head work_list;
1088 struct list_head *pos, *n;
1089 struct obd_export *exp;
1092 CFS_INIT_LIST_HEAD(&work_list);
1093 spin_lock(&obd->obd_dev_lock);
1094 list_for_each_safe(pos, n, &obd->obd_exports) {
1095 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1096 if (exp->exp_replay_needed) {
1097 list_move(&exp->exp_obd_chain, &work_list);
1098 obd->obd_stale_clients++;
1101 spin_unlock(&obd->obd_dev_lock);
1103 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1104 obd->obd_name, obd->obd_stale_clients);
1105 class_disconnect_export_list(&work_list, flags);
1108 EXPORT_SYMBOL(class_disconnect_stale_exports);
1110 void class_disconnect_expired_exports(struct obd_device *obd)
1112 struct list_head expired_list;
1113 struct obd_export *exp, *n;
1117 CFS_INIT_LIST_HEAD(&expired_list);
1118 spin_lock(&obd->obd_dev_lock);
1119 list_for_each_entry_safe(exp, n, &obd->obd_delayed_exports,
1121 if (exp_expired(exp, obd->u.obt.obt_stale_export_age)) {
1122 list_move(&exp->exp_obd_chain, &expired_list);
1126 spin_unlock(&obd->obd_dev_lock);
1131 CDEBUG(D_INFO, "%s: disconnecting %d expired exports\n",
1132 obd->obd_name, cnt);
1133 class_disconnect_export_list(&expired_list, exp_flags_from_obd(obd));
1137 EXPORT_SYMBOL(class_disconnect_expired_exports);
1139 void class_set_export_delayed(struct obd_export *exp)
1141 struct obd_device *obd = class_exp2obd(exp);
1143 LASSERT(!exp->exp_delayed);
1145 /* no need to ping delayed exports */
1146 spin_lock(&obd->obd_dev_lock);
1147 list_del_init(&exp->exp_obd_chain_timed);
1148 list_move_tail(&exp->exp_obd_chain, &obd->obd_delayed_exports);
1149 spin_unlock(&obd->obd_dev_lock);
1151 LASSERT(obd->obd_recoverable_clients > 0);
1153 spin_lock_bh(&obd->obd_processing_task_lock);
1154 /* race with target_queue_last_replay_reply? */
1155 if (exp->exp_replay_needed) {
1156 spin_lock(&exp->exp_lock);
1157 exp->exp_delayed = 1;
1158 spin_unlock(&exp->exp_lock);
1160 obd->obd_delayed_clients++;
1161 obd->obd_recoverable_clients--;
1163 spin_unlock_bh(&obd->obd_processing_task_lock);
1165 CDEBUG(D_HA, "%s: set client %s as delayed\n",
1166 obd->obd_name, exp->exp_client_uuid.uuid);
1168 EXPORT_SYMBOL(class_set_export_delayed);
1171 * Manage exports that have not completed recovery.
1173 void class_handle_stale_exports(struct obd_device *obd)
1175 struct list_head delay_list, evict_list;
1176 struct obd_export *exp, *n;
1180 CFS_INIT_LIST_HEAD(&delay_list);
1181 CFS_INIT_LIST_HEAD(&evict_list);
1182 spin_lock(&obd->obd_dev_lock);
1183 list_for_each_entry_safe(exp, n, &obd->obd_exports, exp_obd_chain) {
1184 LASSERT(!exp->exp_delayed);
1185 /* clients finished recovery */
1186 if (!exp->exp_replay_needed)
1188 /* connected non-vbr clients are evicted */
1189 if (exp->exp_in_recovery && !exp_connect_vbr(exp)) {
1190 obd->obd_stale_clients++;
1191 list_move_tail(&exp->exp_obd_chain, &evict_list);
1194 if (obd->obd_version_recov || !exp->exp_in_recovery) {
1195 list_move_tail(&exp->exp_obd_chain, &delay_list);
1199 #ifndef HAVE_DELAYED_RECOVERY
1200 /* delayed recovery is turned off, evict all delayed exports */
1201 list_splice_init(&delay_list, &evict_list);
1202 list_splice_init(&obd->obd_delayed_exports, &evict_list);
1203 obd->obd_stale_clients += delayed;
1205 spin_unlock(&obd->obd_dev_lock);
1207 list_for_each_entry_safe(exp, n, &delay_list, exp_obd_chain) {
1208 class_set_export_delayed(exp);
1209 exp->exp_last_request_time = cfs_time_current_sec();
1211 LASSERT(list_empty(&delay_list));
1213 /* evict clients without VBR support */
1214 class_disconnect_export_list(&evict_list, exp_flags_from_obd(obd));
1218 EXPORT_SYMBOL(class_handle_stale_exports);
1220 int oig_init(struct obd_io_group **oig_out)
1222 struct obd_io_group *oig;
1225 OBD_ALLOC(oig, sizeof(*oig));
1229 spin_lock_init(&oig->oig_lock);
1231 oig->oig_pending = 0;
1232 atomic_set(&oig->oig_refcount, 1);
1233 cfs_waitq_init(&oig->oig_waitq);
1234 CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1239 EXPORT_SYMBOL(oig_init);
1241 static inline void oig_grab(struct obd_io_group *oig)
1243 atomic_inc(&oig->oig_refcount);
1246 void oig_release(struct obd_io_group *oig)
1248 if (atomic_dec_and_test(&oig->oig_refcount))
1249 OBD_FREE(oig, sizeof(*oig));
1251 EXPORT_SYMBOL(oig_release);
1253 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1256 CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1257 spin_lock(&oig->oig_lock);
1263 list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1265 spin_unlock(&oig->oig_lock);
1270 EXPORT_SYMBOL(oig_add_one);
1272 void oig_complete_one(struct obd_io_group *oig,
1273 struct oig_callback_context *occ, int rc)
1275 cfs_waitq_t *wake = NULL;
1278 spin_lock(&oig->oig_lock);
1281 list_del_init(&occ->occ_oig_item);
1283 old_rc = oig->oig_rc;
1284 if (oig->oig_rc == 0 && rc != 0)
1287 if (--oig->oig_pending <= 0)
1288 wake = &oig->oig_waitq;
1290 spin_unlock(&oig->oig_lock);
1292 CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1293 "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1296 cfs_waitq_signal(wake);
1299 EXPORT_SYMBOL(oig_complete_one);
1301 static int oig_done(struct obd_io_group *oig)
1304 spin_lock(&oig->oig_lock);
1305 if (oig->oig_pending <= 0)
1307 spin_unlock(&oig->oig_lock);
1311 static void interrupted_oig(void *data)
1313 struct obd_io_group *oig = data;
1314 struct oig_callback_context *occ;
1316 spin_lock(&oig->oig_lock);
1317 /* We need to restart the processing each time we drop the lock, as
1318 * it is possible other threads called oig_complete_one() to remove
1319 * an entry elsewhere in the list while we dropped lock. We need to
1320 * drop the lock because osc_ap_completion() calls oig_complete_one()
1321 * which re-gets this lock ;-) as well as a lock ordering issue. */
1323 list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1324 if (occ->interrupted)
1326 occ->interrupted = 1;
1327 spin_unlock(&oig->oig_lock);
1328 occ->occ_interrupted(occ);
1329 spin_lock(&oig->oig_lock);
1332 spin_unlock(&oig->oig_lock);
1335 int oig_wait(struct obd_io_group *oig)
1337 struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1340 CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1343 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1344 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1345 /* we can't continue until the oig has emptied and stopped
1346 * referencing state that the caller will free upon return */
1348 lwi = (struct l_wait_info){ 0, };
1349 } while (rc == -EINTR);
1351 LASSERTF(oig->oig_pending == 0,
1352 "exiting oig_wait(oig = %p) with %d pending\n", oig,
1355 CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1358 EXPORT_SYMBOL(oig_wait);
1360 void class_fail_export(struct obd_export *exp)
1362 int rc, already_failed;
1364 spin_lock(&exp->exp_lock);
1365 already_failed = exp->exp_failed;
1366 exp->exp_failed = 1;
1367 spin_unlock(&exp->exp_lock);
1369 if (already_failed) {
1370 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1371 exp, exp->exp_client_uuid.uuid);
1375 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1376 exp, exp->exp_client_uuid.uuid);
1378 if (obd_dump_on_timeout)
1379 libcfs_debug_dumplog();
1381 /* Most callers into obd_disconnect are removing their own reference
1382 * (request, for example) in addition to the one from the hash table.
1383 * We don't have such a reference here, so make one. */
1384 class_export_get(exp);
1385 rc = obd_disconnect(exp);
1387 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1389 CDEBUG(D_HA, "disconnected export %p/%s\n",
1390 exp, exp->exp_client_uuid.uuid);
1392 EXPORT_SYMBOL(class_fail_export);
1394 char *obd_export_nid2str(struct obd_export *exp)
1396 if (exp->exp_connection != NULL)
1397 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1401 EXPORT_SYMBOL(obd_export_nid2str);
1403 int obd_export_evict_by_nid(struct obd_device *obd, char *nid)
1405 struct obd_export *doomed_exp = NULL;
1406 int exports_evicted = 0;
1408 lnet_nid_t nid_key = libcfs_str2nid(nid);
1411 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1413 if (doomed_exp == NULL)
1416 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1417 "nid %s found, wanted nid %s, requested nid %s\n",
1418 obd_export_nid2str(doomed_exp),
1419 libcfs_nid2str(nid_key), nid);
1422 CDEBUG(D_HA, "%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1423 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1425 class_fail_export(doomed_exp);
1426 class_export_put(doomed_exp);
1429 if (!exports_evicted)
1430 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1431 obd->obd_name, nid);
1432 return exports_evicted;
1434 EXPORT_SYMBOL(obd_export_evict_by_nid);
1436 int obd_export_evict_by_uuid(struct obd_device *obd, char *uuid)
1438 struct obd_export *doomed_exp = NULL;
1439 struct obd_uuid doomed_uuid;
1440 int exports_evicted = 0;
1442 obd_str2uuid(&doomed_uuid, uuid);
1443 if(obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1444 CERROR("%s: can't evict myself\n", obd->obd_name);
1445 return exports_evicted;
1448 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1450 if (doomed_exp == NULL) {
1451 CERROR("%s: can't disconnect %s: no exports found\n",
1452 obd->obd_name, uuid);
1454 CWARN("%s: evicting %s at adminstrative request\n",
1455 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1456 class_fail_export(doomed_exp);
1457 class_export_put(doomed_exp);
1461 return exports_evicted;
1463 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1465 void obd_zombie_impexp_cull(void)
1467 struct obd_import *import;
1468 struct obd_export *export;
1471 spin_lock (&obd_zombie_impexp_lock);
1474 if (!list_empty(&obd_zombie_imports)) {
1475 import = list_entry(obd_zombie_imports.next,
1478 list_del(&import->imp_zombie_chain);
1482 if (!list_empty(&obd_zombie_exports)) {
1483 export = list_entry(obd_zombie_exports.next,
1486 list_del_init(&export->exp_obd_chain);
1489 spin_unlock(&obd_zombie_impexp_lock);
1492 class_import_destroy(import);
1495 class_export_destroy(export);
1497 } while (import != NULL || export != NULL);
1500 static struct completion obd_zombie_start;
1501 static struct completion obd_zombie_stop;
1502 static unsigned long obd_zombie_flags;
1503 static cfs_waitq_t obd_zombie_waitq;
1504 static pid_t obd_zombie_pid;
1510 int obd_zombi_impexp_check(void *arg)
1514 spin_lock(&obd_zombie_impexp_lock);
1515 rc = list_empty(&obd_zombie_imports) &&
1516 list_empty(&obd_zombie_exports) &&
1517 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1519 spin_unlock(&obd_zombie_impexp_lock);
1524 static void obd_zombie_impexp_notify(void)
1526 cfs_waitq_signal(&obd_zombie_waitq);
1530 * check whether obd_zombie is idle
1532 static int obd_zombie_is_idle(void)
1536 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1537 spin_lock(&obd_zombie_impexp_lock);
1538 rc = list_empty(&obd_zombie_imports) &&
1539 list_empty(&obd_zombie_exports);
1540 spin_unlock(&obd_zombie_impexp_lock);
1545 * wait when obd_zombie import/export queues become empty
1547 void obd_zombie_barrier(void)
1549 struct l_wait_info lwi = { 0 };
1551 if (obd_zombie_pid == cfs_curproc_pid())
1552 /* don't wait for myself */
1554 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1556 EXPORT_SYMBOL(obd_zombie_barrier);
1560 static int obd_zombie_impexp_thread(void *unused)
1564 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1565 complete(&obd_zombie_start);
1569 complete(&obd_zombie_start);
1571 obd_zombie_pid = cfs_curproc_pid();
1573 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1574 struct l_wait_info lwi = { 0 };
1576 l_wait_event(obd_zombie_waitq, !obd_zombi_impexp_check(NULL), &lwi);
1578 obd_zombie_impexp_cull();
1581 * Notify obd_zombie_barrier callers that queues
1584 cfs_waitq_signal(&obd_zombie_waitq);
1587 complete(&obd_zombie_stop);
1592 #else /* ! KERNEL */
1594 static atomic_t zombi_recur = ATOMIC_INIT(0);
1595 static void *obd_zombi_impexp_work_cb;
1596 static void *obd_zombi_impexp_idle_cb;
1598 int obd_zombi_impexp_kill(void *arg)
1602 if (atomic_inc_return(&zombi_recur) == 1) {
1603 obd_zombie_impexp_cull();
1606 atomic_dec(&zombi_recur);
1612 int obd_zombie_impexp_init(void)
1616 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1617 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1618 spin_lock_init(&obd_zombie_impexp_lock);
1619 init_completion(&obd_zombie_start);
1620 init_completion(&obd_zombie_stop);
1621 cfs_waitq_init(&obd_zombie_waitq);
1625 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1629 wait_for_completion(&obd_zombie_start);
1632 obd_zombi_impexp_work_cb =
1633 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1634 &obd_zombi_impexp_kill, NULL);
1636 obd_zombi_impexp_idle_cb =
1637 liblustre_register_idle_callback("obd_zombi_impexp_check",
1638 &obd_zombi_impexp_check, NULL);
1645 void obd_zombie_impexp_stop(void)
1647 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1648 obd_zombie_impexp_notify();
1650 wait_for_completion(&obd_zombie_stop);
1652 liblustre_deregister_wait_callback(obd_zombi_impexp_work_cb);
1653 liblustre_deregister_idle_callback(obd_zombi_impexp_idle_cb);