1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50 #include <lustre_export.h>
52 extern struct list_head obd_types;
53 spinlock_t obd_types_lock;
55 cfs_mem_cache_t *obd_device_cachep;
56 cfs_mem_cache_t *obdo_cachep;
57 EXPORT_SYMBOL(obdo_cachep);
58 cfs_mem_cache_t *import_cachep;
60 struct list_head obd_zombie_imports;
61 struct list_head obd_zombie_exports;
62 spinlock_t obd_zombie_impexp_lock;
63 static void obd_zombie_impexp_notify(void);
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 * support functions: we could use inter-module communication, but this
69 * is more portable to other OS's
71 static struct obd_device *obd_device_alloc(void)
73 struct obd_device *obd;
75 OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
77 obd->obd_magic = OBD_DEVICE_MAGIC;
81 EXPORT_SYMBOL(obd_device_alloc);
83 static void obd_device_free(struct obd_device *obd)
86 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic "
87 "%08x != %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88 if (obd->obd_namespace != NULL) {
89 CERROR("obd %p: namespace %p was not properly cleaned up "
91 obd, obd->obd_namespace, obd->obd_force);
94 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 EXPORT_SYMBOL(obd_device_free);
98 struct obd_type *class_search_type(const char *name)
100 struct list_head *tmp;
101 struct obd_type *type;
103 spin_lock(&obd_types_lock);
104 list_for_each(tmp, &obd_types) {
105 type = list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 spin_unlock(&obd_types_lock);
111 spin_unlock(&obd_types_lock);
115 struct obd_type *class_get_type(const char *name)
117 struct obd_type *type = class_search_type(name);
119 #ifdef HAVE_MODULE_LOADING_SUPPORT
121 const char *modname = name;
122 if (strcmp(modname, LUSTRE_MDT_NAME) == 0)
123 modname = LUSTRE_MDS_NAME;
124 if (!request_module("%s", modname)) {
125 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126 type = class_search_type(name);
128 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134 spin_lock(&type->obd_type_lock);
136 try_module_get(type->typ_ops->o_owner);
137 spin_unlock(&type->obd_type_lock);
142 void class_put_type(struct obd_type *type)
145 spin_lock(&type->obd_type_lock);
147 module_put(type->typ_ops->o_owner);
148 spin_unlock(&type->obd_type_lock);
151 int class_register_type(struct obd_ops *ops, struct lprocfs_vars *vars,
154 struct obd_type *type;
158 LASSERT(strnlen(name, 1024) < 1024); /* sanity check */
160 if (class_search_type(name)) {
161 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166 OBD_ALLOC(type, sizeof(*type));
170 OBD_ALLOC(type->typ_ops, sizeof(*type->typ_ops));
171 OBD_ALLOC(type->typ_name, strlen(name) + 1);
172 if (type->typ_ops == NULL || type->typ_name == NULL)
175 *(type->typ_ops) = *ops;
176 strcpy(type->typ_name, name);
177 spin_lock_init(&type->obd_type_lock);
180 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
182 if (IS_ERR(type->typ_procroot)) {
183 rc = PTR_ERR(type->typ_procroot);
184 type->typ_procroot = NULL;
189 spin_lock(&obd_types_lock);
190 list_add(&type->typ_chain, &obd_types);
191 spin_unlock(&obd_types_lock);
196 if (type->typ_name != NULL)
197 OBD_FREE(type->typ_name, strlen(name) + 1);
198 if (type->typ_ops != NULL)
199 OBD_FREE (type->typ_ops, sizeof (*type->typ_ops));
200 OBD_FREE(type, sizeof(*type));
204 int class_unregister_type(const char *name)
206 struct obd_type *type = class_search_type(name);
210 CERROR("unknown obd type\n");
214 if (type->typ_refcnt) {
215 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
216 /* This is a bad situation, let's make the best of it */
217 /* Remove ops, but leave the name for debugging */
218 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
222 if (type->typ_procroot)
223 lprocfs_remove(&type->typ_procroot);
225 spin_lock(&obd_types_lock);
226 list_del(&type->typ_chain);
227 spin_unlock(&obd_types_lock);
228 OBD_FREE(type->typ_name, strlen(name) + 1);
229 if (type->typ_ops != NULL)
230 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
231 OBD_FREE(type, sizeof(*type));
233 } /* class_unregister_type */
236 * Create a new obd device.
238 * Find an empty slot in ::obd_devs[], create a new obd device in it.
240 * \param typename [in] obd device type string.
241 * \param name [in] obd device name.
243 * \retval NULL if create fails, otherwise return the obd device
246 struct obd_device *class_newdev(const char *type_name, const char *name)
248 struct obd_device *result = NULL;
249 struct obd_device *newdev;
250 struct obd_type *type = NULL;
252 int new_obd_minor = 0;
254 if (strlen(name) >= MAX_OBD_NAME) {
255 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
256 RETURN(ERR_PTR(-EINVAL));
259 type = class_get_type(type_name);
261 CERROR("OBD: unknown type: %s\n", type_name);
262 RETURN(ERR_PTR(-ENODEV));
265 newdev = obd_device_alloc();
266 if (newdev == NULL) {
267 class_put_type(type);
268 RETURN(ERR_PTR(-ENOMEM));
270 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
272 spin_lock(&obd_dev_lock);
273 for (i = 0; i < class_devno_max(); i++) {
274 struct obd_device *obd = class_num2obd(i);
275 if (obd && obd->obd_name && (strcmp(name, obd->obd_name) == 0)){
276 CERROR("Device %s already exists, won't add\n", name);
278 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
279 "%p obd_magic %08x != %08x\n", result,
280 result->obd_magic, OBD_DEVICE_MAGIC);
281 LASSERTF(result->obd_minor == new_obd_minor,
282 "%p obd_minor %d != %d\n", result,
283 result->obd_minor, new_obd_minor);
285 obd_devs[result->obd_minor] = NULL;
286 result->obd_name[0]='\0';
288 result = ERR_PTR(-EEXIST);
291 if (!result && !obd) {
293 result->obd_minor = i;
295 result->obd_type = type;
296 strncpy(result->obd_name, name,
297 sizeof(result->obd_name) - 1);
298 obd_devs[i] = result;
301 spin_unlock(&obd_dev_lock);
303 if (result == NULL && i >= class_devno_max()) {
304 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
306 result = ERR_PTR(-EOVERFLOW);
309 if (IS_ERR(result)) {
310 obd_device_free(newdev);
311 class_put_type(type);
313 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
314 result->obd_name, result);
319 void class_release_dev(struct obd_device *obd)
321 struct obd_type *obd_type = obd->obd_type;
323 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
324 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
325 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
326 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
327 LASSERT(obd_type != NULL);
329 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
330 obd->obd_name,obd->obd_type->typ_name);
332 spin_lock(&obd_dev_lock);
333 obd_devs[obd->obd_minor] = NULL;
334 spin_unlock(&obd_dev_lock);
335 obd_device_free(obd);
337 class_put_type(obd_type);
340 int class_name2dev(const char *name)
347 spin_lock(&obd_dev_lock);
348 for (i = 0; i < class_devno_max(); i++) {
349 struct obd_device *obd = class_num2obd(i);
350 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
351 /* Make sure we finished attaching before we give
352 out any references */
353 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
354 if (obd->obd_attached) {
355 spin_unlock(&obd_dev_lock);
361 spin_unlock(&obd_dev_lock);
366 struct obd_device *class_name2obd(const char *name)
368 int dev = class_name2dev(name);
370 if (dev < 0 || dev > class_devno_max())
372 return class_num2obd(dev);
375 int class_uuid2dev(struct obd_uuid *uuid)
379 spin_lock(&obd_dev_lock);
380 for (i = 0; i < class_devno_max(); i++) {
381 struct obd_device *obd = class_num2obd(i);
382 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
383 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
384 spin_unlock(&obd_dev_lock);
388 spin_unlock(&obd_dev_lock);
393 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
395 int dev = class_uuid2dev(uuid);
398 return class_num2obd(dev);
402 * Get obd device from ::obd_devs[]
404 * \param num [in] array index
406 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
407 * otherwise return the obd device there.
409 struct obd_device *class_num2obd(int num)
411 struct obd_device *obd = NULL;
413 if (num < class_devno_max()) {
418 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
419 "%p obd_magic %08x != %08x\n",
420 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
421 LASSERTF(obd->obd_minor == num,
422 "%p obd_minor %0d != %0d\n",
423 obd, obd->obd_minor, num);
429 void class_obd_list(void)
434 spin_lock(&obd_dev_lock);
435 for (i = 0; i < class_devno_max(); i++) {
436 struct obd_device *obd = class_num2obd(i);
439 if (obd->obd_stopping)
441 else if (obd->obd_set_up)
443 else if (obd->obd_attached)
447 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
448 i, status, obd->obd_type->typ_name,
449 obd->obd_name, obd->obd_uuid.uuid,
450 atomic_read(&obd->obd_refcount));
452 spin_unlock(&obd_dev_lock);
456 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
457 specified, then only the client with that uuid is returned,
458 otherwise any client connected to the tgt is returned. */
459 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
460 const char * typ_name,
461 struct obd_uuid *grp_uuid)
465 spin_lock(&obd_dev_lock);
466 for (i = 0; i < class_devno_max(); i++) {
467 struct obd_device *obd = class_num2obd(i);
470 if ((strncmp(obd->obd_type->typ_name, typ_name,
471 strlen(typ_name)) == 0)) {
472 if (obd_uuid_equals(tgt_uuid,
473 &obd->u.cli.cl_target_uuid) &&
474 ((grp_uuid)? obd_uuid_equals(grp_uuid,
475 &obd->obd_uuid) : 1)) {
476 spin_unlock(&obd_dev_lock);
481 spin_unlock(&obd_dev_lock);
486 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
487 struct obd_uuid *grp_uuid)
489 struct obd_device *obd;
491 obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
493 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
498 /* Iterate the obd_device list looking devices have grp_uuid. Start
499 searching at *next, and if a device is found, the next index to look
500 at is saved in *next. If next is NULL, then the first matching device
501 will always be returned. */
502 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
508 else if (*next >= 0 && *next < class_devno_max())
513 spin_lock(&obd_dev_lock);
514 for (; i < class_devno_max(); i++) {
515 struct obd_device *obd = class_num2obd(i);
518 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
521 spin_unlock(&obd_dev_lock);
525 spin_unlock(&obd_dev_lock);
531 void obd_cleanup_caches(void)
536 if (obd_device_cachep) {
537 rc = cfs_mem_cache_destroy(obd_device_cachep);
538 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
539 obd_device_cachep = NULL;
542 rc = cfs_mem_cache_destroy(obdo_cachep);
543 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
547 rc = cfs_mem_cache_destroy(import_cachep);
548 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
549 import_cachep = NULL;
554 int obd_init_caches(void)
558 LASSERT(obd_device_cachep == NULL);
559 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
560 sizeof(struct obd_device),
562 if (!obd_device_cachep)
565 LASSERT(obdo_cachep == NULL);
566 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
571 LASSERT(import_cachep == NULL);
572 import_cachep = cfs_mem_cache_create("ll_import_cache",
573 sizeof(struct obd_import),
580 obd_cleanup_caches();
585 /* map connection to client */
586 struct obd_export *class_conn2export(struct lustre_handle *conn)
588 struct obd_export *export;
592 CDEBUG(D_CACHE, "looking for null handle\n");
596 if (conn->cookie == -1) { /* this means assign a new connection */
597 CDEBUG(D_CACHE, "want a new connection\n");
601 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
602 export = class_handle2object(conn->cookie);
606 struct obd_device *class_exp2obd(struct obd_export *exp)
613 struct obd_device *class_conn2obd(struct lustre_handle *conn)
615 struct obd_export *export;
616 export = class_conn2export(conn);
618 struct obd_device *obd = export->exp_obd;
619 class_export_put(export);
625 struct obd_import *class_exp2cliimp(struct obd_export *exp)
627 struct obd_device *obd = exp->exp_obd;
630 return obd->u.cli.cl_import;
633 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
635 struct obd_device *obd = class_conn2obd(conn);
638 return obd->u.cli.cl_import;
641 /* Export management functions */
642 static void export_handle_addref(void *export)
644 class_export_get(export);
647 /* called from mds_commit_cb() in context of journal commit callback
648 * and cannot call any blocking functions. */
649 void __class_export_put(struct obd_export *exp)
651 if (atomic_dec_and_test(&exp->exp_refcount)) {
652 LASSERT (list_empty(&exp->exp_obd_chain));
654 CDEBUG(D_IOCTL, "final put %p/%s\n",
655 exp, exp->exp_client_uuid.uuid);
657 /* release nid stat refererence */
658 lprocfs_exp_cleanup(exp);
660 spin_lock(&obd_zombie_impexp_lock);
661 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
662 spin_unlock(&obd_zombie_impexp_lock);
664 obd_zombie_impexp_notify();
667 EXPORT_SYMBOL(__class_export_put);
669 void class_export_destroy(struct obd_export *exp)
671 struct obd_device *obd = exp->exp_obd;
673 LASSERT (atomic_read(&exp->exp_refcount) == 0);
675 CDEBUG(D_IOCTL, "destroying export %p/%s\n", exp,
676 exp->exp_client_uuid.uuid);
678 LASSERT(obd != NULL);
680 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
681 if (exp->exp_connection)
682 ptlrpc_put_connection_superhack(exp->exp_connection);
684 LASSERT(list_empty(&exp->exp_outstanding_replies));
685 LASSERT(list_empty(&exp->exp_uncommitted_replies));
686 LASSERT(list_empty(&exp->exp_req_replay_queue));
687 LASSERT(list_empty(&exp->exp_queued_rpc));
688 obd_destroy_export(exp);
690 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
694 /* Creates a new export, adds it to the hash table, and returns a
695 * pointer to it. The refcount is 2: one for the hash reference, and
696 * one for the pointer returned by this function. */
697 struct obd_export *class_new_export(struct obd_device *obd,
698 struct obd_uuid *cluuid)
700 struct obd_export *export;
703 OBD_ALLOC(export, sizeof(*export));
705 return ERR_PTR(-ENOMEM);
707 export->exp_conn_cnt = 0;
708 export->exp_lock_hash = NULL;
709 atomic_set(&export->exp_refcount, 2);
710 atomic_set(&export->exp_rpc_count, 0);
711 export->exp_obd = obd;
712 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
713 spin_lock_init(&export->exp_uncommitted_replies_lock);
714 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
715 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
716 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
718 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
719 class_handle_hash(&export->exp_handle, export_handle_addref);
720 export->exp_last_request_time = cfs_time_current_sec();
721 spin_lock_init(&export->exp_lock);
722 INIT_HLIST_NODE(&export->exp_uuid_hash);
723 INIT_HLIST_NODE(&export->exp_nid_hash);
725 export->exp_client_uuid = *cluuid;
726 obd_init_export(export);
728 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
729 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
730 &export->exp_uuid_hash);
732 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
733 obd->obd_name, cluuid->uuid, rc);
734 class_handle_unhash(&export->exp_handle);
735 OBD_FREE_PTR(export);
736 return ERR_PTR(-EALREADY);
740 spin_lock(&obd->obd_dev_lock);
741 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
743 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
744 list_add_tail(&export->exp_obd_chain_timed,
745 &export->exp_obd->obd_exports_timed);
746 export->exp_obd->obd_num_exports++;
747 spin_unlock(&obd->obd_dev_lock);
751 EXPORT_SYMBOL(class_new_export);
753 void class_unlink_export(struct obd_export *exp)
755 class_handle_unhash(&exp->exp_handle);
757 spin_lock(&exp->exp_obd->obd_dev_lock);
758 /* delete an uuid-export hashitem from hashtables */
759 if (!hlist_unhashed(&exp->exp_uuid_hash))
760 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
761 &exp->exp_client_uuid,
762 &exp->exp_uuid_hash);
764 list_del_init(&exp->exp_obd_chain);
765 list_del_init(&exp->exp_obd_chain_timed);
766 exp->exp_obd->obd_num_exports--;
767 spin_unlock(&exp->exp_obd->obd_dev_lock);
768 /* Keep these counter valid always */
769 spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
770 if (exp->exp_delayed) {
771 spin_lock(&exp->exp_lock);
772 exp->exp_delayed = 0;
773 spin_unlock(&exp->exp_lock);
774 LASSERT(exp->exp_obd->obd_delayed_clients);
775 exp->exp_obd->obd_delayed_clients--;
776 } else if (exp->exp_replay_needed) {
777 spin_lock(&exp->exp_lock);
778 exp->exp_replay_needed = 0;
779 spin_unlock(&exp->exp_lock);
780 LASSERT(exp->exp_obd->obd_recoverable_clients);
781 exp->exp_obd->obd_recoverable_clients--;
784 if (exp->exp_obd->obd_recovering && exp->exp_in_recovery) {
785 spin_lock(&exp->exp_lock);
786 exp->exp_in_recovery = 0;
787 spin_unlock(&exp->exp_lock);
788 LASSERT(exp->exp_obd->obd_connected_clients);
789 exp->exp_obd->obd_connected_clients--;
791 spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
792 class_export_put(exp);
794 EXPORT_SYMBOL(class_unlink_export);
796 /* Import management functions */
797 static void import_handle_addref(void *import)
799 class_import_get(import);
802 struct obd_import *class_import_get(struct obd_import *import)
804 LASSERT(atomic_read(&import->imp_refcount) >= 0);
805 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
806 atomic_inc(&import->imp_refcount);
807 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
808 atomic_read(&import->imp_refcount),
809 import->imp_obd->obd_name);
812 EXPORT_SYMBOL(class_import_get);
814 void class_import_put(struct obd_import *import)
818 LASSERT(atomic_read(&import->imp_refcount) > 0);
819 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
820 LASSERT(list_empty(&import->imp_zombie_chain));
822 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
823 atomic_read(&import->imp_refcount) - 1,
824 import->imp_obd->obd_name);
826 if (atomic_dec_and_test(&import->imp_refcount)) {
827 CDEBUG(D_INFO, "final put import %p\n", import);
828 spin_lock(&obd_zombie_impexp_lock);
829 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
830 spin_unlock(&obd_zombie_impexp_lock);
832 obd_zombie_impexp_notify();
837 EXPORT_SYMBOL(class_import_put);
839 void class_import_destroy(struct obd_import *import)
843 CDEBUG(D_IOCTL, "destroying import %p\n", import);
845 LASSERT(atomic_read(&import->imp_refcount) == 0);
847 ptlrpc_put_connection_superhack(import->imp_connection);
849 while (!list_empty(&import->imp_conn_list)) {
850 struct obd_import_conn *imp_conn;
852 imp_conn = list_entry(import->imp_conn_list.next,
853 struct obd_import_conn, oic_item);
854 list_del(&imp_conn->oic_item);
855 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
856 OBD_FREE(imp_conn, sizeof(*imp_conn));
859 class_decref(import->imp_obd);
860 OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
864 static void init_imp_at(struct imp_at *at) {
866 at_init(&at->iat_net_latency, 0, 0);
867 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
868 /* max service estimates are tracked on the server side, so
869 don't use the AT history here, just use the last reported
870 val. (But keep hist for proc histogram, worst_ever) */
871 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
876 struct obd_import *class_new_import(struct obd_device *obd)
878 struct obd_import *imp;
880 OBD_ALLOC(imp, sizeof(*imp));
884 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
885 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
886 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
887 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
888 spin_lock_init(&imp->imp_lock);
889 imp->imp_last_success_conn = 0;
890 imp->imp_state = LUSTRE_IMP_NEW;
891 imp->imp_obd = class_incref(obd);
892 cfs_waitq_init(&imp->imp_recovery_waitq);
894 atomic_set(&imp->imp_refcount, 2);
895 atomic_set(&imp->imp_unregistering, 0);
896 atomic_set(&imp->imp_inflight, 0);
897 atomic_set(&imp->imp_replay_inflight, 0);
898 atomic_set(&imp->imp_inval_count, 0);
899 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
900 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
901 class_handle_hash(&imp->imp_handle, import_handle_addref);
902 init_imp_at(&imp->imp_at);
904 /* b1_8 supports both v1 & v2. but HEAD only supports v2.
907 #define HAVE_DEFAULT_V2_CONNECT 1
908 #ifdef HAVE_DEFAULT_V2_CONNECT
909 /* the default magic is V2, will be used in connect RPC, and
910 * then adjusted according to the flags in request/reply. */
911 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
913 /* the default magic is V1, will be used in connect RPC, and
914 * then adjusted according to the flags in request/reply. */
915 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V1;
920 EXPORT_SYMBOL(class_new_import);
922 void class_destroy_import(struct obd_import *import)
924 LASSERT(import != NULL);
925 LASSERT(import != LP_POISON);
927 class_handle_unhash(&import->imp_handle);
929 spin_lock(&import->imp_lock);
930 import->imp_generation++;
931 spin_unlock(&import->imp_lock);
933 class_import_put(import);
935 EXPORT_SYMBOL(class_destroy_import);
937 /* A connection defines an export context in which preallocation can
938 be managed. This releases the export pointer reference, and returns
939 the export handle, so the export refcount is 1 when this function
941 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
942 struct obd_uuid *cluuid)
944 struct obd_export *export;
945 LASSERT(conn != NULL);
946 LASSERT(obd != NULL);
947 LASSERT(cluuid != NULL);
950 export = class_new_export(obd, cluuid);
952 RETURN(PTR_ERR(export));
954 conn->cookie = export->exp_handle.h_cookie;
955 class_export_put(export);
957 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
958 cluuid->uuid, conn->cookie);
961 EXPORT_SYMBOL(class_connect);
963 /* This function removes 1-3 references from the export:
964 * 1 - for export pointer passed
965 * and if disconnect really need
966 * 2 - removing from hash
967 * 3 - in client_unlink_export
968 * The export pointer passed to this function can destroyed */
969 int class_disconnect(struct obd_export *export)
971 int already_disconnected;
974 if (export == NULL) {
976 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
980 spin_lock(&export->exp_lock);
981 already_disconnected = export->exp_disconnected;
982 export->exp_disconnected = 1;
983 spin_unlock(&export->exp_lock);
986 /* class_cleanup(), abort_recovery(), and class_fail_export()
987 * all end up in here, and if any of them race we shouldn't
988 * call extra class_export_puts(). */
989 if (already_disconnected)
990 GOTO(no_disconn, already_disconnected);
992 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
993 export->exp_handle.h_cookie);
996 if (!hlist_unhashed(&export->exp_nid_hash))
997 lustre_hash_del(export->exp_obd->obd_nid_hash,
998 &export->exp_connection->c_peer.nid,
999 &export->exp_nid_hash);
1001 class_unlink_export(export);
1004 class_export_put(export);
1008 /* Return non-zero for a fully connected export */
1009 int class_connected_export(struct obd_export *exp)
1013 spin_lock(&exp->exp_lock);
1014 connected = (exp->exp_conn_cnt > 0);
1015 spin_unlock(&exp->exp_lock);
1020 EXPORT_SYMBOL(class_connected_export);
1022 static void class_disconnect_export_list(struct list_head *list,
1023 enum obd_option flags)
1026 struct obd_export *exp;
1029 /* It's possible that an export may disconnect itself, but
1030 * nothing else will be added to this list. */
1031 while (!list_empty(list)) {
1032 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1033 /* need for safe call CDEBUG after obd_disconnect */
1034 class_export_get(exp);
1036 spin_lock(&exp->exp_lock);
1037 exp->exp_flags = flags;
1038 spin_unlock(&exp->exp_lock);
1040 if (obd_uuid_equals(&exp->exp_client_uuid,
1041 &exp->exp_obd->obd_uuid)) {
1043 "exp %p export uuid == obd uuid, don't discon\n",
1045 /* Need to delete this now so we don't end up pointing
1046 * to work_list later when this export is cleaned up. */
1047 list_del_init(&exp->exp_obd_chain);
1048 class_export_put(exp);
1052 class_export_get(exp);
1053 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1054 "last request at %ld\n",
1055 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1056 exp, exp->exp_last_request_time);
1058 /* release one export reference anyway */
1059 rc = obd_disconnect(exp);
1060 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1061 obd_export_nid2str(exp), exp, rc);
1062 class_export_put(exp);
1067 void class_disconnect_exports(struct obd_device *obd)
1069 struct list_head work_list;
1072 /* Move all of the exports from obd_exports to a work list, en masse. */
1073 CFS_INIT_LIST_HEAD(&work_list);
1074 spin_lock(&obd->obd_dev_lock);
1075 list_splice_init(&obd->obd_delayed_exports, &work_list);
1076 list_splice_init(&obd->obd_exports, &work_list);
1077 spin_unlock(&obd->obd_dev_lock);
1079 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1080 "disconnecting them\n", obd->obd_minor, obd);
1081 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd));
1084 EXPORT_SYMBOL(class_disconnect_exports);
1086 /* Remove exports that have not completed recovery. */
1087 void class_disconnect_stale_exports(struct obd_device *obd,
1088 enum obd_option flags)
1090 struct list_head work_list;
1091 struct list_head *pos, *n;
1092 struct obd_export *exp;
1095 CFS_INIT_LIST_HEAD(&work_list);
1096 spin_lock(&obd->obd_dev_lock);
1097 list_for_each_safe(pos, n, &obd->obd_exports) {
1098 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1099 if (exp->exp_replay_needed) {
1100 list_move(&exp->exp_obd_chain, &work_list);
1101 obd->obd_stale_clients++;
1104 spin_unlock(&obd->obd_dev_lock);
1106 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1107 obd->obd_name, obd->obd_stale_clients);
1108 class_disconnect_export_list(&work_list, flags);
1111 EXPORT_SYMBOL(class_disconnect_stale_exports);
1113 void class_disconnect_expired_exports(struct obd_device *obd)
1115 struct list_head expired_list;
1116 struct obd_export *exp, *n;
1120 CFS_INIT_LIST_HEAD(&expired_list);
1121 spin_lock(&obd->obd_dev_lock);
1122 list_for_each_entry_safe(exp, n, &obd->obd_delayed_exports,
1124 if (exp_expired(exp, obd->u.obt.obt_stale_export_age)) {
1125 list_move(&exp->exp_obd_chain, &expired_list);
1129 spin_unlock(&obd->obd_dev_lock);
1134 CDEBUG(D_INFO, "%s: disconnecting %d expired exports\n",
1135 obd->obd_name, cnt);
1136 class_disconnect_export_list(&expired_list, exp_flags_from_obd(obd));
1140 EXPORT_SYMBOL(class_disconnect_expired_exports);
1142 void class_set_export_delayed(struct obd_export *exp)
1144 struct obd_device *obd = class_exp2obd(exp);
1146 LASSERT(!exp->exp_delayed);
1148 /* no need to ping delayed exports */
1149 spin_lock(&obd->obd_dev_lock);
1150 list_del_init(&exp->exp_obd_chain_timed);
1151 list_move_tail(&exp->exp_obd_chain, &obd->obd_delayed_exports);
1152 spin_unlock(&obd->obd_dev_lock);
1154 LASSERT(obd->obd_recoverable_clients > 0);
1156 spin_lock_bh(&obd->obd_processing_task_lock);
1157 /* race with target_queue_last_replay_reply? */
1158 if (exp->exp_replay_needed) {
1159 spin_lock(&exp->exp_lock);
1160 exp->exp_delayed = 1;
1161 spin_unlock(&exp->exp_lock);
1163 obd->obd_delayed_clients++;
1164 obd->obd_recoverable_clients--;
1166 spin_unlock_bh(&obd->obd_processing_task_lock);
1168 CDEBUG(D_HA, "%s: set client %s as delayed\n",
1169 obd->obd_name, exp->exp_client_uuid.uuid);
1171 EXPORT_SYMBOL(class_set_export_delayed);
1174 * Manage exports that have not completed recovery.
1176 void class_handle_stale_exports(struct obd_device *obd)
1178 struct list_head delay_list, evict_list;
1179 struct obd_export *exp, *n;
1183 CFS_INIT_LIST_HEAD(&delay_list);
1184 CFS_INIT_LIST_HEAD(&evict_list);
1185 spin_lock(&obd->obd_dev_lock);
1186 list_for_each_entry_safe(exp, n, &obd->obd_exports, exp_obd_chain) {
1187 LASSERT(!exp->exp_delayed);
1188 /* clients finished recovery */
1189 if (!exp->exp_replay_needed)
1191 /* connected non-vbr clients are evicted */
1192 if (exp->exp_in_recovery && !exp_connect_vbr(exp)) {
1193 obd->obd_stale_clients++;
1194 list_move_tail(&exp->exp_obd_chain, &evict_list);
1197 if (obd->obd_version_recov || !exp->exp_in_recovery) {
1198 list_move_tail(&exp->exp_obd_chain, &delay_list);
1202 #ifndef HAVE_DELAYED_RECOVERY
1203 /* delayed recovery is turned off, evict all delayed exports */
1204 list_splice_init(&delay_list, &evict_list);
1205 list_splice_init(&obd->obd_delayed_exports, &evict_list);
1206 obd->obd_stale_clients += delayed;
1208 spin_unlock(&obd->obd_dev_lock);
1210 list_for_each_entry_safe(exp, n, &delay_list, exp_obd_chain) {
1211 class_set_export_delayed(exp);
1212 exp->exp_last_request_time = cfs_time_current_sec();
1214 LASSERT(list_empty(&delay_list));
1216 /* evict clients without VBR support */
1217 class_disconnect_export_list(&evict_list, exp_flags_from_obd(obd));
1221 EXPORT_SYMBOL(class_handle_stale_exports);
1223 int oig_init(struct obd_io_group **oig_out)
1225 struct obd_io_group *oig;
1228 OBD_ALLOC(oig, sizeof(*oig));
1232 spin_lock_init(&oig->oig_lock);
1234 oig->oig_pending = 0;
1235 atomic_set(&oig->oig_refcount, 1);
1236 cfs_waitq_init(&oig->oig_waitq);
1237 CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1242 EXPORT_SYMBOL(oig_init);
1244 static inline void oig_grab(struct obd_io_group *oig)
1246 atomic_inc(&oig->oig_refcount);
1249 void oig_release(struct obd_io_group *oig)
1251 if (atomic_dec_and_test(&oig->oig_refcount))
1252 OBD_FREE(oig, sizeof(*oig));
1254 EXPORT_SYMBOL(oig_release);
1256 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1259 CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1260 spin_lock(&oig->oig_lock);
1266 list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1268 spin_unlock(&oig->oig_lock);
1273 EXPORT_SYMBOL(oig_add_one);
1275 void oig_complete_one(struct obd_io_group *oig,
1276 struct oig_callback_context *occ, int rc)
1278 cfs_waitq_t *wake = NULL;
1281 spin_lock(&oig->oig_lock);
1284 list_del_init(&occ->occ_oig_item);
1286 old_rc = oig->oig_rc;
1287 if (oig->oig_rc == 0 && rc != 0)
1290 if (--oig->oig_pending <= 0)
1291 wake = &oig->oig_waitq;
1293 spin_unlock(&oig->oig_lock);
1295 CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1296 "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1299 cfs_waitq_signal(wake);
1302 EXPORT_SYMBOL(oig_complete_one);
1304 static int oig_done(struct obd_io_group *oig)
1307 spin_lock(&oig->oig_lock);
1308 if (oig->oig_pending <= 0)
1310 spin_unlock(&oig->oig_lock);
1314 static void interrupted_oig(void *data)
1316 struct obd_io_group *oig = data;
1317 struct oig_callback_context *occ;
1319 spin_lock(&oig->oig_lock);
1320 /* We need to restart the processing each time we drop the lock, as
1321 * it is possible other threads called oig_complete_one() to remove
1322 * an entry elsewhere in the list while we dropped lock. We need to
1323 * drop the lock because osc_ap_completion() calls oig_complete_one()
1324 * which re-gets this lock ;-) as well as a lock ordering issue. */
1326 list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1327 if (occ->interrupted)
1329 occ->interrupted = 1;
1330 spin_unlock(&oig->oig_lock);
1331 occ->occ_interrupted(occ);
1332 spin_lock(&oig->oig_lock);
1335 spin_unlock(&oig->oig_lock);
1338 int oig_wait(struct obd_io_group *oig)
1340 struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1343 CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1346 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1347 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1348 /* we can't continue until the oig has emptied and stopped
1349 * referencing state that the caller will free upon return */
1351 lwi = (struct l_wait_info){ 0, };
1352 } while (rc == -EINTR);
1354 LASSERTF(oig->oig_pending == 0,
1355 "exiting oig_wait(oig = %p) with %d pending\n", oig,
1358 CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1361 EXPORT_SYMBOL(oig_wait);
1363 void class_fail_export(struct obd_export *exp)
1365 int rc, already_failed;
1367 spin_lock(&exp->exp_lock);
1368 already_failed = exp->exp_failed;
1369 exp->exp_failed = 1;
1370 spin_unlock(&exp->exp_lock);
1372 if (already_failed) {
1373 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1374 exp, exp->exp_client_uuid.uuid);
1378 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1379 exp, exp->exp_client_uuid.uuid);
1381 if (obd_dump_on_timeout)
1382 libcfs_debug_dumplog();
1384 /* Most callers into obd_disconnect are removing their own reference
1385 * (request, for example) in addition to the one from the hash table.
1386 * We don't have such a reference here, so make one. */
1387 class_export_get(exp);
1388 rc = obd_disconnect(exp);
1390 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1392 CDEBUG(D_HA, "disconnected export %p/%s\n",
1393 exp, exp->exp_client_uuid.uuid);
1395 EXPORT_SYMBOL(class_fail_export);
1397 char *obd_export_nid2str(struct obd_export *exp)
1399 if (exp->exp_connection != NULL)
1400 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1404 EXPORT_SYMBOL(obd_export_nid2str);
1406 int obd_export_evict_by_nid(struct obd_device *obd, char *nid)
1408 struct obd_export *doomed_exp = NULL;
1409 int exports_evicted = 0;
1411 lnet_nid_t nid_key = libcfs_str2nid(nid);
1414 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1416 if (doomed_exp == NULL)
1419 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1420 "nid %s found, wanted nid %s, requested nid %s\n",
1421 obd_export_nid2str(doomed_exp),
1422 libcfs_nid2str(nid_key), nid);
1425 CDEBUG(D_HA, "%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1426 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1428 class_fail_export(doomed_exp);
1429 class_export_put(doomed_exp);
1432 if (!exports_evicted)
1433 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1434 obd->obd_name, nid);
1435 return exports_evicted;
1437 EXPORT_SYMBOL(obd_export_evict_by_nid);
1439 int obd_export_evict_by_uuid(struct obd_device *obd, char *uuid)
1441 struct obd_export *doomed_exp = NULL;
1442 struct obd_uuid doomed_uuid;
1443 int exports_evicted = 0;
1445 obd_str2uuid(&doomed_uuid, uuid);
1446 if(obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1447 CERROR("%s: can't evict myself\n", obd->obd_name);
1448 return exports_evicted;
1451 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1453 if (doomed_exp == NULL) {
1454 CERROR("%s: can't disconnect %s: no exports found\n",
1455 obd->obd_name, uuid);
1457 CWARN("%s: evicting %s at adminstrative request\n",
1458 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1459 class_fail_export(doomed_exp);
1460 class_export_put(doomed_exp);
1464 return exports_evicted;
1466 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1468 void obd_zombie_impexp_cull(void)
1470 struct obd_import *import;
1471 struct obd_export *export;
1474 spin_lock (&obd_zombie_impexp_lock);
1477 if (!list_empty(&obd_zombie_imports)) {
1478 import = list_entry(obd_zombie_imports.next,
1481 list_del(&import->imp_zombie_chain);
1485 if (!list_empty(&obd_zombie_exports)) {
1486 export = list_entry(obd_zombie_exports.next,
1489 list_del_init(&export->exp_obd_chain);
1492 spin_unlock(&obd_zombie_impexp_lock);
1495 class_import_destroy(import);
1498 class_export_destroy(export);
1500 } while (import != NULL || export != NULL);
1503 static struct completion obd_zombie_start;
1504 static struct completion obd_zombie_stop;
1505 static unsigned long obd_zombie_flags;
1506 static cfs_waitq_t obd_zombie_waitq;
1507 static pid_t obd_zombie_pid;
1513 int obd_zombi_impexp_check(void *arg)
1517 spin_lock(&obd_zombie_impexp_lock);
1518 rc = list_empty(&obd_zombie_imports) &&
1519 list_empty(&obd_zombie_exports) &&
1520 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1522 spin_unlock(&obd_zombie_impexp_lock);
1527 static void obd_zombie_impexp_notify(void)
1529 cfs_waitq_signal(&obd_zombie_waitq);
1533 * check whether obd_zombie is idle
1535 static int obd_zombie_is_idle(void)
1539 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1540 spin_lock(&obd_zombie_impexp_lock);
1541 rc = list_empty(&obd_zombie_imports) &&
1542 list_empty(&obd_zombie_exports);
1543 spin_unlock(&obd_zombie_impexp_lock);
1548 * wait when obd_zombie import/export queues become empty
1550 void obd_zombie_barrier(void)
1552 struct l_wait_info lwi = { 0 };
1554 if (obd_zombie_pid == cfs_curproc_pid())
1555 /* don't wait for myself */
1557 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1559 EXPORT_SYMBOL(obd_zombie_barrier);
1563 static int obd_zombie_impexp_thread(void *unused)
1567 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1568 complete(&obd_zombie_start);
1572 complete(&obd_zombie_start);
1574 obd_zombie_pid = cfs_curproc_pid();
1576 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1577 struct l_wait_info lwi = { 0 };
1579 l_wait_event(obd_zombie_waitq, !obd_zombi_impexp_check(NULL), &lwi);
1581 obd_zombie_impexp_cull();
1584 * Notify obd_zombie_barrier callers that queues
1587 cfs_waitq_signal(&obd_zombie_waitq);
1590 complete(&obd_zombie_stop);
1595 #else /* ! KERNEL */
1597 static atomic_t zombi_recur = ATOMIC_INIT(0);
1598 static void *obd_zombi_impexp_work_cb;
1599 static void *obd_zombi_impexp_idle_cb;
1601 int obd_zombi_impexp_kill(void *arg)
1605 if (atomic_inc_return(&zombi_recur) == 1) {
1606 obd_zombie_impexp_cull();
1609 atomic_dec(&zombi_recur);
1615 int obd_zombie_impexp_init(void)
1619 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1620 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1621 spin_lock_init(&obd_zombie_impexp_lock);
1622 init_completion(&obd_zombie_start);
1623 init_completion(&obd_zombie_stop);
1624 cfs_waitq_init(&obd_zombie_waitq);
1628 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1632 wait_for_completion(&obd_zombie_start);
1635 obd_zombi_impexp_work_cb =
1636 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1637 &obd_zombi_impexp_kill, NULL);
1639 obd_zombi_impexp_idle_cb =
1640 liblustre_register_idle_callback("obd_zombi_impexp_check",
1641 &obd_zombi_impexp_check, NULL);
1648 void obd_zombie_impexp_stop(void)
1650 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1651 obd_zombie_impexp_notify();
1653 wait_for_completion(&obd_zombie_stop);
1655 liblustre_deregister_wait_callback(obd_zombi_impexp_work_cb);
1656 liblustre_deregister_idle_callback(obd_zombi_impexp_idle_cb);