1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 struct list_head obd_zombie_imports;
60 struct list_head obd_zombie_exports;
61 spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65 static void print_export_data(struct obd_export *exp,
66 const char *status, int locks);
68 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
84 EXPORT_SYMBOL(obd_device_alloc);
86 static void obd_device_free(struct obd_device *obd)
89 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
90 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
91 if (obd->obd_namespace != NULL) {
92 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
93 obd, obd->obd_namespace, obd->obd_force);
96 lu_ref_fini(&obd->obd_reference);
97 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
100 struct obd_type *class_search_type(const char *name)
102 struct list_head *tmp;
103 struct obd_type *type;
105 spin_lock(&obd_types_lock);
106 list_for_each(tmp, &obd_types) {
107 type = list_entry(tmp, struct obd_type, typ_chain);
108 if (strcmp(type->typ_name, name) == 0) {
109 spin_unlock(&obd_types_lock);
113 spin_unlock(&obd_types_lock);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
123 const char *modname = name;
124 if (!request_module("%s", modname)) {
125 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126 type = class_search_type(name);
128 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134 spin_lock(&type->obd_type_lock);
136 try_module_get(type->typ_dt_ops->o_owner);
137 spin_unlock(&type->obd_type_lock);
142 void class_put_type(struct obd_type *type)
145 spin_lock(&type->obd_type_lock);
147 module_put(type->typ_dt_ops->o_owner);
148 spin_unlock(&type->obd_type_lock);
151 #define CLASS_MAX_NAME 1024
153 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
154 struct lprocfs_vars *vars, const char *name,
155 struct lu_device_type *ldt)
157 struct obd_type *type;
162 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
164 if (class_search_type(name)) {
165 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
170 OBD_ALLOC(type, sizeof(*type));
174 OBD_ALLOC_PTR(type->typ_dt_ops);
175 OBD_ALLOC_PTR(type->typ_md_ops);
176 OBD_ALLOC(type->typ_name, strlen(name) + 1);
178 if (type->typ_dt_ops == NULL ||
179 type->typ_md_ops == NULL ||
180 type->typ_name == NULL)
183 *(type->typ_dt_ops) = *dt_ops;
184 /* md_ops is optional */
186 *(type->typ_md_ops) = *md_ops;
187 strcpy(type->typ_name, name);
188 spin_lock_init(&type->obd_type_lock);
191 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
193 if (IS_ERR(type->typ_procroot)) {
194 rc = PTR_ERR(type->typ_procroot);
195 type->typ_procroot = NULL;
201 rc = lu_device_type_init(ldt);
206 spin_lock(&obd_types_lock);
207 list_add(&type->typ_chain, &obd_types);
208 spin_unlock(&obd_types_lock);
213 if (type->typ_name != NULL)
214 OBD_FREE(type->typ_name, strlen(name) + 1);
215 if (type->typ_md_ops != NULL)
216 OBD_FREE_PTR(type->typ_md_ops);
217 if (type->typ_dt_ops != NULL)
218 OBD_FREE_PTR(type->typ_dt_ops);
219 OBD_FREE(type, sizeof(*type));
223 int class_unregister_type(const char *name)
225 struct obd_type *type = class_search_type(name);
229 CERROR("unknown obd type\n");
233 if (type->typ_refcnt) {
234 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
235 /* This is a bad situation, let's make the best of it */
236 /* Remove ops, but leave the name for debugging */
237 OBD_FREE_PTR(type->typ_dt_ops);
238 OBD_FREE_PTR(type->typ_md_ops);
242 if (type->typ_procroot) {
243 lprocfs_remove(&type->typ_procroot);
247 lu_device_type_fini(type->typ_lu);
249 spin_lock(&obd_types_lock);
250 list_del(&type->typ_chain);
251 spin_unlock(&obd_types_lock);
252 OBD_FREE(type->typ_name, strlen(name) + 1);
253 if (type->typ_dt_ops != NULL)
254 OBD_FREE_PTR(type->typ_dt_ops);
255 if (type->typ_md_ops != NULL)
256 OBD_FREE_PTR(type->typ_md_ops);
257 OBD_FREE(type, sizeof(*type));
259 } /* class_unregister_type */
262 * Create a new obd device.
264 * Find an empty slot in ::obd_devs[], create a new obd device in it.
266 * \param[in] type_name obd device type string.
267 * \param[in] name obd device name.
269 * \retval NULL if create fails, otherwise return the obd device
272 struct obd_device *class_newdev(const char *type_name, const char *name)
274 struct obd_device *result = NULL;
275 struct obd_device *newdev;
276 struct obd_type *type = NULL;
278 int new_obd_minor = 0;
280 if (strlen(name) >= MAX_OBD_NAME) {
281 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
282 RETURN(ERR_PTR(-EINVAL));
285 type = class_get_type(type_name);
287 CERROR("OBD: unknown type: %s\n", type_name);
288 RETURN(ERR_PTR(-ENODEV));
291 newdev = obd_device_alloc();
292 if (newdev == NULL) {
293 class_put_type(type);
294 RETURN(ERR_PTR(-ENOMEM));
296 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
298 spin_lock(&obd_dev_lock);
299 for (i = 0; i < class_devno_max(); i++) {
300 struct obd_device *obd = class_num2obd(i);
301 if (obd && obd->obd_name &&
302 (strcmp(name, obd->obd_name) == 0)) {
303 CERROR("Device %s already exists, won't add\n", name);
305 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
306 "%p obd_magic %08x != %08x\n", result,
307 result->obd_magic, OBD_DEVICE_MAGIC);
308 LASSERTF(result->obd_minor == new_obd_minor,
309 "%p obd_minor %d != %d\n", result,
310 result->obd_minor, new_obd_minor);
312 obd_devs[result->obd_minor] = NULL;
313 result->obd_name[0]='\0';
315 result = ERR_PTR(-EEXIST);
318 if (!result && !obd) {
320 result->obd_minor = i;
322 result->obd_type = type;
323 strncpy(result->obd_name, name,
324 sizeof(result->obd_name) - 1);
325 obd_devs[i] = result;
328 spin_unlock(&obd_dev_lock);
330 if (result == NULL && i >= class_devno_max()) {
331 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
333 result = ERR_PTR(-EOVERFLOW);
336 if (IS_ERR(result)) {
337 obd_device_free(newdev);
338 class_put_type(type);
340 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
341 result->obd_name, result);
346 void class_release_dev(struct obd_device *obd)
348 struct obd_type *obd_type = obd->obd_type;
350 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
351 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
352 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
353 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
354 LASSERT(obd_type != NULL);
356 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
357 obd->obd_name,obd->obd_type->typ_name);
359 spin_lock(&obd_dev_lock);
360 obd_devs[obd->obd_minor] = NULL;
361 spin_unlock(&obd_dev_lock);
362 obd_device_free(obd);
364 class_put_type(obd_type);
367 int class_name2dev(const char *name)
374 spin_lock(&obd_dev_lock);
375 for (i = 0; i < class_devno_max(); i++) {
376 struct obd_device *obd = class_num2obd(i);
377 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
378 /* Make sure we finished attaching before we give
379 out any references */
380 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
381 if (obd->obd_attached) {
382 spin_unlock(&obd_dev_lock);
388 spin_unlock(&obd_dev_lock);
393 struct obd_device *class_name2obd(const char *name)
395 int dev = class_name2dev(name);
397 if (dev < 0 || dev > class_devno_max())
399 return class_num2obd(dev);
402 int class_uuid2dev(struct obd_uuid *uuid)
406 spin_lock(&obd_dev_lock);
407 for (i = 0; i < class_devno_max(); i++) {
408 struct obd_device *obd = class_num2obd(i);
409 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
410 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
411 spin_unlock(&obd_dev_lock);
415 spin_unlock(&obd_dev_lock);
420 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
422 int dev = class_uuid2dev(uuid);
425 return class_num2obd(dev);
429 * Get obd device from ::obd_devs[]
431 * \param num [in] array index
433 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
434 * otherwise return the obd device there.
436 struct obd_device *class_num2obd(int num)
438 struct obd_device *obd = NULL;
440 if (num < class_devno_max()) {
445 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
446 "%p obd_magic %08x != %08x\n",
447 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
448 LASSERTF(obd->obd_minor == num,
449 "%p obd_minor %0d != %0d\n",
450 obd, obd->obd_minor, num);
456 void class_obd_list(void)
461 spin_lock(&obd_dev_lock);
462 for (i = 0; i < class_devno_max(); i++) {
463 struct obd_device *obd = class_num2obd(i);
466 if (obd->obd_stopping)
468 else if (obd->obd_set_up)
470 else if (obd->obd_attached)
474 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
475 i, status, obd->obd_type->typ_name,
476 obd->obd_name, obd->obd_uuid.uuid,
477 atomic_read(&obd->obd_refcount));
479 spin_unlock(&obd_dev_lock);
483 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
484 specified, then only the client with that uuid is returned,
485 otherwise any client connected to the tgt is returned. */
486 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
487 const char * typ_name,
488 struct obd_uuid *grp_uuid)
492 spin_lock(&obd_dev_lock);
493 for (i = 0; i < class_devno_max(); i++) {
494 struct obd_device *obd = class_num2obd(i);
497 if ((strncmp(obd->obd_type->typ_name, typ_name,
498 strlen(typ_name)) == 0)) {
499 if (obd_uuid_equals(tgt_uuid,
500 &obd->u.cli.cl_target_uuid) &&
501 ((grp_uuid)? obd_uuid_equals(grp_uuid,
502 &obd->obd_uuid) : 1)) {
503 spin_unlock(&obd_dev_lock);
508 spin_unlock(&obd_dev_lock);
513 /* Iterate the obd_device list looking devices have grp_uuid. Start
514 searching at *next, and if a device is found, the next index to look
515 at is saved in *next. If next is NULL, then the first matching device
516 will always be returned. */
517 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
523 else if (*next >= 0 && *next < class_devno_max())
528 spin_lock(&obd_dev_lock);
529 for (; i < class_devno_max(); i++) {
530 struct obd_device *obd = class_num2obd(i);
533 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
536 spin_unlock(&obd_dev_lock);
540 spin_unlock(&obd_dev_lock);
546 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
547 * adjust sptlrpc settings accordingly.
549 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
551 struct obd_device *obd;
555 LASSERT(namelen > 0);
557 spin_lock(&obd_dev_lock);
558 for (i = 0; i < class_devno_max(); i++) {
559 obd = class_num2obd(i);
561 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
564 /* only notify mdc, osc, mdt, ost */
565 type = obd->obd_type->typ_name;
566 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
567 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
568 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
569 strcmp(type, LUSTRE_OST_NAME) != 0)
572 if (strncmp(obd->obd_name, fsname, namelen))
575 class_incref(obd, __FUNCTION__, obd);
576 spin_unlock(&obd_dev_lock);
577 rc2 = obd_set_info_async(obd->obd_self_export,
578 sizeof(KEY_SPTLRPC_CONF),
579 KEY_SPTLRPC_CONF, 0, NULL, NULL);
581 class_decref(obd, __FUNCTION__, obd);
582 spin_lock(&obd_dev_lock);
584 spin_unlock(&obd_dev_lock);
587 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
589 void obd_cleanup_caches(void)
594 if (obd_device_cachep) {
595 rc = cfs_mem_cache_destroy(obd_device_cachep);
596 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
597 obd_device_cachep = NULL;
600 rc = cfs_mem_cache_destroy(obdo_cachep);
601 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
605 rc = cfs_mem_cache_destroy(import_cachep);
606 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
607 import_cachep = NULL;
610 rc = cfs_mem_cache_destroy(capa_cachep);
611 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
617 int obd_init_caches(void)
621 LASSERT(obd_device_cachep == NULL);
622 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
623 sizeof(struct obd_device),
625 if (!obd_device_cachep)
628 LASSERT(obdo_cachep == NULL);
629 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
634 LASSERT(import_cachep == NULL);
635 import_cachep = cfs_mem_cache_create("ll_import_cache",
636 sizeof(struct obd_import),
641 LASSERT(capa_cachep == NULL);
642 capa_cachep = cfs_mem_cache_create("capa_cache",
643 sizeof(struct obd_capa), 0, 0);
649 obd_cleanup_caches();
654 /* map connection to client */
655 struct obd_export *class_conn2export(struct lustre_handle *conn)
657 struct obd_export *export;
661 CDEBUG(D_CACHE, "looking for null handle\n");
665 if (conn->cookie == -1) { /* this means assign a new connection */
666 CDEBUG(D_CACHE, "want a new connection\n");
670 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
671 export = class_handle2object(conn->cookie);
675 struct obd_device *class_exp2obd(struct obd_export *exp)
682 struct obd_device *class_conn2obd(struct lustre_handle *conn)
684 struct obd_export *export;
685 export = class_conn2export(conn);
687 struct obd_device *obd = export->exp_obd;
688 class_export_put(export);
694 struct obd_import *class_exp2cliimp(struct obd_export *exp)
696 struct obd_device *obd = exp->exp_obd;
699 return obd->u.cli.cl_import;
702 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
704 struct obd_device *obd = class_conn2obd(conn);
707 return obd->u.cli.cl_import;
710 /* Export management functions */
711 static void class_export_destroy(struct obd_export *exp)
713 struct obd_device *obd = exp->exp_obd;
716 LASSERT (atomic_read(&exp->exp_refcount) == 0);
718 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
719 exp->exp_client_uuid.uuid, obd->obd_name);
721 LASSERT(obd != NULL);
723 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
724 if (exp->exp_connection)
725 ptlrpc_put_connection_superhack(exp->exp_connection);
727 LASSERT(list_empty(&exp->exp_outstanding_replies));
728 LASSERT(list_empty(&exp->exp_uncommitted_replies));
729 LASSERT(list_empty(&exp->exp_req_replay_queue));
730 LASSERT(list_empty(&exp->exp_queued_rpc));
731 obd_destroy_export(exp);
732 class_decref(obd, "export", exp);
734 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
738 static void export_handle_addref(void *export)
740 class_export_get(export);
743 struct obd_export *class_export_get(struct obd_export *exp)
745 atomic_inc(&exp->exp_refcount);
746 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
747 atomic_read(&exp->exp_refcount));
750 EXPORT_SYMBOL(class_export_get);
752 void class_export_put(struct obd_export *exp)
754 LASSERT(exp != NULL);
755 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
756 atomic_read(&exp->exp_refcount) - 1);
757 LASSERT(atomic_read(&exp->exp_refcount) > 0);
758 LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
760 if (atomic_dec_and_test(&exp->exp_refcount)) {
761 LASSERT(!list_empty(&exp->exp_obd_chain));
762 CDEBUG(D_IOCTL, "final put %p/%s\n",
763 exp, exp->exp_client_uuid.uuid);
764 obd_zombie_export_add(exp);
767 EXPORT_SYMBOL(class_export_put);
769 /* Creates a new export, adds it to the hash table, and returns a
770 * pointer to it. The refcount is 2: one for the hash reference, and
771 * one for the pointer returned by this function. */
772 struct obd_export *class_new_export(struct obd_device *obd,
773 struct obd_uuid *cluuid)
775 struct obd_export *export;
779 OBD_ALLOC_PTR(export);
781 return ERR_PTR(-ENOMEM);
783 export->exp_conn_cnt = 0;
784 export->exp_lock_hash = NULL;
785 atomic_set(&export->exp_refcount, 2);
786 atomic_set(&export->exp_rpc_count, 0);
787 atomic_set(&export->exp_cb_count, 0);
788 atomic_set(&export->exp_locks_count, 0);
789 #if LUSTRE_TRACKS_LOCK_EXP_REFS
790 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
791 spin_lock_init(&export->exp_locks_list_guard);
793 atomic_set(&export->exp_replay_count, 0);
794 export->exp_obd = obd;
795 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
796 spin_lock_init(&export->exp_uncommitted_replies_lock);
797 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
798 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
799 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
800 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
801 class_handle_hash(&export->exp_handle, export_handle_addref);
802 export->exp_last_request_time = cfs_time_current_sec();
803 spin_lock_init(&export->exp_lock);
804 INIT_HLIST_NODE(&export->exp_uuid_hash);
805 INIT_HLIST_NODE(&export->exp_nid_hash);
807 export->exp_sp_peer = LUSTRE_SP_ANY;
808 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
809 export->exp_client_uuid = *cluuid;
810 obd_init_export(export);
812 spin_lock(&obd->obd_dev_lock);
813 /* shouldn't happen, but might race */
814 if (obd->obd_stopping)
815 GOTO(exit_err, rc = -ENODEV);
817 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
818 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
819 &export->exp_uuid_hash);
821 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
822 obd->obd_name, cluuid->uuid, rc);
823 GOTO(exit_err, rc = -EALREADY);
827 class_incref(obd, "export", export);
828 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
829 list_add_tail(&export->exp_obd_chain_timed,
830 &export->exp_obd->obd_exports_timed);
831 export->exp_obd->obd_num_exports++;
832 spin_unlock(&obd->obd_dev_lock);
836 spin_unlock(&obd->obd_dev_lock);
837 class_handle_unhash(&export->exp_handle);
838 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
839 obd_destroy_export(export);
840 OBD_FREE_PTR(export);
843 EXPORT_SYMBOL(class_new_export);
845 void class_unlink_export(struct obd_export *exp)
847 class_handle_unhash(&exp->exp_handle);
849 spin_lock(&exp->exp_obd->obd_dev_lock);
850 /* delete an uuid-export hashitem from hashtables */
851 if (!hlist_unhashed(&exp->exp_uuid_hash))
852 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
853 &exp->exp_client_uuid,
854 &exp->exp_uuid_hash);
856 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
857 list_del_init(&exp->exp_obd_chain_timed);
858 exp->exp_obd->obd_num_exports--;
859 spin_unlock(&exp->exp_obd->obd_dev_lock);
860 class_export_put(exp);
862 EXPORT_SYMBOL(class_unlink_export);
864 /* Import management functions */
865 void class_import_destroy(struct obd_import *imp)
869 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
870 imp->imp_obd->obd_name);
872 LASSERT(atomic_read(&imp->imp_refcount) == 0);
874 ptlrpc_put_connection_superhack(imp->imp_connection);
876 while (!list_empty(&imp->imp_conn_list)) {
877 struct obd_import_conn *imp_conn;
879 imp_conn = list_entry(imp->imp_conn_list.next,
880 struct obd_import_conn, oic_item);
881 list_del_init(&imp_conn->oic_item);
882 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
883 OBD_FREE(imp_conn, sizeof(*imp_conn));
886 LASSERT(imp->imp_sec == NULL);
887 class_decref(imp->imp_obd, "import", imp);
888 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
892 static void import_handle_addref(void *import)
894 class_import_get(import);
897 struct obd_import *class_import_get(struct obd_import *import)
899 LASSERT(atomic_read(&import->imp_refcount) >= 0);
900 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
901 atomic_inc(&import->imp_refcount);
902 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
903 atomic_read(&import->imp_refcount),
904 import->imp_obd->obd_name);
907 EXPORT_SYMBOL(class_import_get);
909 void class_import_put(struct obd_import *imp)
913 LASSERT(atomic_read(&imp->imp_refcount) > 0);
914 LASSERT(atomic_read(&imp->imp_refcount) < 0x5a5a5a);
915 LASSERT(list_empty(&imp->imp_zombie_chain));
917 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
918 atomic_read(&imp->imp_refcount) - 1,
919 imp->imp_obd->obd_name);
921 if (atomic_dec_and_test(&imp->imp_refcount)) {
922 CDEBUG(D_INFO, "final put import %p\n", imp);
923 obd_zombie_import_add(imp);
928 EXPORT_SYMBOL(class_import_put);
930 static void init_imp_at(struct imp_at *at) {
932 at_init(&at->iat_net_latency, 0, 0);
933 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
934 /* max service estimates are tracked on the server side, so
935 don't use the AT history here, just use the last reported
936 val. (But keep hist for proc histogram, worst_ever) */
937 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
942 struct obd_import *class_new_import(struct obd_device *obd)
944 struct obd_import *imp;
946 OBD_ALLOC(imp, sizeof(*imp));
950 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
951 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
952 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
953 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
954 spin_lock_init(&imp->imp_lock);
955 imp->imp_last_success_conn = 0;
956 imp->imp_state = LUSTRE_IMP_NEW;
957 imp->imp_obd = class_incref(obd, "import", imp);
958 sema_init(&imp->imp_sec_mutex, 1);
959 cfs_waitq_init(&imp->imp_recovery_waitq);
961 atomic_set(&imp->imp_refcount, 2);
962 atomic_set(&imp->imp_unregistering, 0);
963 atomic_set(&imp->imp_inflight, 0);
964 atomic_set(&imp->imp_replay_inflight, 0);
965 atomic_set(&imp->imp_inval_count, 0);
966 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
967 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
968 class_handle_hash(&imp->imp_handle, import_handle_addref);
969 init_imp_at(&imp->imp_at);
971 /* the default magic is V2, will be used in connect RPC, and
972 * then adjusted according to the flags in request/reply. */
973 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
977 EXPORT_SYMBOL(class_new_import);
979 void class_destroy_import(struct obd_import *import)
981 LASSERT(import != NULL);
982 LASSERT(import != LP_POISON);
984 class_handle_unhash(&import->imp_handle);
986 spin_lock(&import->imp_lock);
987 import->imp_generation++;
988 spin_unlock(&import->imp_lock);
989 class_import_put(import);
991 EXPORT_SYMBOL(class_destroy_import);
993 #if LUSTRE_TRACKS_LOCK_EXP_REFS
995 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
997 spin_lock(&exp->exp_locks_list_guard);
999 LASSERT(lock->l_exp_refs_nr >= 0);
1001 if (lock->l_exp_refs_target != NULL &&
1002 lock->l_exp_refs_target != exp) {
1003 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1004 exp, lock, lock->l_exp_refs_target);
1006 if ((lock->l_exp_refs_nr ++) == 0) {
1007 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1008 lock->l_exp_refs_target = exp;
1010 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1011 lock, exp, lock->l_exp_refs_nr);
1012 spin_unlock(&exp->exp_locks_list_guard);
1014 EXPORT_SYMBOL(__class_export_add_lock_ref);
1016 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1018 spin_lock(&exp->exp_locks_list_guard);
1019 LASSERT(lock->l_exp_refs_nr > 0);
1020 if (lock->l_exp_refs_target != exp) {
1021 LCONSOLE_WARN("lock %p, "
1022 "mismatching export pointers: %p, %p\n",
1023 lock, lock->l_exp_refs_target, exp);
1025 if (-- lock->l_exp_refs_nr == 0) {
1026 list_del_init(&lock->l_exp_refs_link);
1027 lock->l_exp_refs_target = NULL;
1029 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1030 lock, exp, lock->l_exp_refs_nr);
1031 spin_unlock(&exp->exp_locks_list_guard);
1033 EXPORT_SYMBOL(__class_export_del_lock_ref);
1036 /* A connection defines an export context in which preallocation can
1037 be managed. This releases the export pointer reference, and returns
1038 the export handle, so the export refcount is 1 when this function
1040 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1041 struct obd_uuid *cluuid)
1043 struct obd_export *export;
1044 LASSERT(conn != NULL);
1045 LASSERT(obd != NULL);
1046 LASSERT(cluuid != NULL);
1049 export = class_new_export(obd, cluuid);
1051 RETURN(PTR_ERR(export));
1053 conn->cookie = export->exp_handle.h_cookie;
1054 class_export_put(export);
1056 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1057 cluuid->uuid, conn->cookie);
1060 EXPORT_SYMBOL(class_connect);
1062 /* if export is involved in recovery then clean up related things */
1063 void class_export_recovery_cleanup(struct obd_export *exp)
1065 struct obd_device *obd = exp->exp_obd;
1067 spin_lock_bh(&obd->obd_processing_task_lock);
1068 if (exp->exp_delayed)
1069 obd->obd_delayed_clients--;
1070 if (obd->obd_recovering && exp->exp_in_recovery) {
1071 spin_lock(&exp->exp_lock);
1072 exp->exp_in_recovery = 0;
1073 spin_unlock(&exp->exp_lock);
1074 LASSERT(obd->obd_connected_clients);
1075 obd->obd_connected_clients--;
1077 /** Cleanup req replay fields */
1078 if (exp->exp_req_replay_needed) {
1079 spin_lock(&exp->exp_lock);
1080 exp->exp_req_replay_needed = 0;
1081 spin_unlock(&exp->exp_lock);
1082 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1083 atomic_dec(&obd->obd_req_replay_clients);
1085 /** Cleanup lock replay data */
1086 if (exp->exp_lock_replay_needed) {
1087 spin_lock(&exp->exp_lock);
1088 exp->exp_lock_replay_needed = 0;
1089 spin_unlock(&exp->exp_lock);
1090 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1091 atomic_dec(&obd->obd_lock_replay_clients);
1093 spin_unlock_bh(&obd->obd_processing_task_lock);
1096 /* This function removes 1-3 references from the export:
1097 * 1 - for export pointer passed
1098 * and if disconnect really need
1099 * 2 - removing from hash
1100 * 3 - in client_unlink_export
1101 * The export pointer passed to this function can destroyed */
1102 int class_disconnect(struct obd_export *export)
1104 int already_disconnected;
1107 if (export == NULL) {
1109 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1113 spin_lock(&export->exp_lock);
1114 already_disconnected = export->exp_disconnected;
1115 export->exp_disconnected = 1;
1116 spin_unlock(&export->exp_lock);
1118 /* class_cleanup(), abort_recovery(), and class_fail_export()
1119 * all end up in here, and if any of them race we shouldn't
1120 * call extra class_export_puts(). */
1121 if (already_disconnected) {
1122 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1123 GOTO(no_disconn, already_disconnected);
1126 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1127 export->exp_handle.h_cookie);
1129 if (!hlist_unhashed(&export->exp_nid_hash))
1130 lustre_hash_del(export->exp_obd->obd_nid_hash,
1131 &export->exp_connection->c_peer.nid,
1132 &export->exp_nid_hash);
1134 class_export_recovery_cleanup(export);
1135 class_unlink_export(export);
1137 class_export_put(export);
1141 static void class_disconnect_export_list(struct list_head *list,
1142 enum obd_option flags)
1145 struct obd_export *exp;
1148 /* It's possible that an export may disconnect itself, but
1149 * nothing else will be added to this list. */
1150 while (!list_empty(list)) {
1151 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1152 /* need for safe call CDEBUG after obd_disconnect */
1153 class_export_get(exp);
1155 spin_lock(&exp->exp_lock);
1156 exp->exp_flags = flags;
1157 spin_unlock(&exp->exp_lock);
1159 if (obd_uuid_equals(&exp->exp_client_uuid,
1160 &exp->exp_obd->obd_uuid)) {
1162 "exp %p export uuid == obd uuid, don't discon\n",
1164 /* Need to delete this now so we don't end up pointing
1165 * to work_list later when this export is cleaned up. */
1166 list_del_init(&exp->exp_obd_chain);
1167 class_export_put(exp);
1171 class_export_get(exp);
1172 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1173 "last request at "CFS_TIME_T"\n",
1174 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1175 exp, exp->exp_last_request_time);
1176 /* release one export reference anyway */
1177 rc = obd_disconnect(exp);
1179 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1180 obd_export_nid2str(exp), exp, rc);
1181 class_export_put(exp);
1186 void class_disconnect_exports(struct obd_device *obd)
1188 struct list_head work_list;
1191 /* Move all of the exports from obd_exports to a work list, en masse. */
1192 CFS_INIT_LIST_HEAD(&work_list);
1193 spin_lock(&obd->obd_dev_lock);
1194 list_splice_init(&obd->obd_exports, &work_list);
1195 list_splice_init(&obd->obd_delayed_exports, &work_list);
1196 spin_unlock(&obd->obd_dev_lock);
1198 if (!list_empty(&work_list)) {
1199 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1200 "disconnecting them\n", obd->obd_minor, obd);
1201 class_disconnect_export_list(&work_list,
1202 exp_flags_from_obd(obd));
1204 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1205 obd->obd_minor, obd);
1208 EXPORT_SYMBOL(class_disconnect_exports);
1210 /* Remove exports that have not completed recovery.
1212 void class_disconnect_stale_exports(struct obd_device *obd,
1213 int (*test_export)(struct obd_export *))
1215 struct list_head work_list;
1216 struct list_head *pos, *n;
1217 struct obd_export *exp;
1221 CFS_INIT_LIST_HEAD(&work_list);
1222 spin_lock(&obd->obd_dev_lock);
1223 list_for_each_safe(pos, n, &obd->obd_exports) {
1224 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1225 if (test_export(exp))
1228 /* don't count self-export as client */
1229 if (obd_uuid_equals(&exp->exp_client_uuid,
1230 &exp->exp_obd->obd_uuid))
1233 list_move(&exp->exp_obd_chain, &work_list);
1235 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1236 obd->obd_name, exp->exp_client_uuid.uuid,
1237 exp->exp_connection == NULL ? "<unknown>" :
1238 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1239 print_export_data(exp, "EVICTING", 0);
1241 spin_unlock(&obd->obd_dev_lock);
1244 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1245 obd->obd_name, evicted);
1246 obd->obd_stale_clients += evicted;
1248 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1249 OBD_OPT_ABORT_RECOV);
1252 EXPORT_SYMBOL(class_disconnect_stale_exports);
1254 void class_fail_export(struct obd_export *exp)
1256 int rc, already_failed;
1258 spin_lock(&exp->exp_lock);
1259 already_failed = exp->exp_failed;
1260 exp->exp_failed = 1;
1261 spin_unlock(&exp->exp_lock);
1263 if (already_failed) {
1264 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1265 exp, exp->exp_client_uuid.uuid);
1269 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1270 exp, exp->exp_client_uuid.uuid);
1272 if (obd_dump_on_timeout)
1273 libcfs_debug_dumplog();
1275 /* Most callers into obd_disconnect are removing their own reference
1276 * (request, for example) in addition to the one from the hash table.
1277 * We don't have such a reference here, so make one. */
1278 class_export_get(exp);
1279 rc = obd_disconnect(exp);
1281 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1283 CDEBUG(D_HA, "disconnected export %p/%s\n",
1284 exp, exp->exp_client_uuid.uuid);
1286 EXPORT_SYMBOL(class_fail_export);
1288 char *obd_export_nid2str(struct obd_export *exp)
1290 if (exp->exp_connection != NULL)
1291 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1295 EXPORT_SYMBOL(obd_export_nid2str);
1297 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1299 struct obd_export *doomed_exp = NULL;
1300 int exports_evicted = 0;
1302 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1305 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1306 if (doomed_exp == NULL)
1309 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1310 "nid %s found, wanted nid %s, requested nid %s\n",
1311 obd_export_nid2str(doomed_exp),
1312 libcfs_nid2str(nid_key), nid);
1313 LASSERTF(doomed_exp != obd->obd_self_export,
1314 "self-export is hashed by NID?\n");
1316 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1317 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1319 class_fail_export(doomed_exp);
1320 class_export_put(doomed_exp);
1323 if (!exports_evicted)
1324 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1325 obd->obd_name, nid);
1326 return exports_evicted;
1328 EXPORT_SYMBOL(obd_export_evict_by_nid);
1330 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1332 struct obd_export *doomed_exp = NULL;
1333 struct obd_uuid doomed_uuid;
1334 int exports_evicted = 0;
1336 obd_str2uuid(&doomed_uuid, uuid);
1337 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1338 CERROR("%s: can't evict myself\n", obd->obd_name);
1339 return exports_evicted;
1342 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1344 if (doomed_exp == NULL) {
1345 CERROR("%s: can't disconnect %s: no exports found\n",
1346 obd->obd_name, uuid);
1348 CWARN("%s: evicting %s at adminstrative request\n",
1349 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1350 class_fail_export(doomed_exp);
1351 class_export_put(doomed_exp);
1355 return exports_evicted;
1357 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1359 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1360 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1361 EXPORT_SYMBOL(class_export_dump_hook);
1364 static void print_export_data(struct obd_export *exp, const char *status,
1367 struct ptlrpc_reply_state *rs;
1368 struct ptlrpc_reply_state *first_reply = NULL;
1371 spin_lock(&exp->exp_lock);
1372 list_for_each_entry (rs, &exp->exp_outstanding_replies, rs_exp_list) {
1377 spin_unlock(&exp->exp_lock);
1379 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1380 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1381 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1382 atomic_read(&exp->exp_rpc_count),
1383 atomic_read(&exp->exp_cb_count),
1384 atomic_read(&exp->exp_locks_count),
1385 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1386 nreplies, first_reply, nreplies > 3 ? "..." : "",
1387 exp->exp_last_committed);
1388 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1389 if (locks && class_export_dump_hook != NULL)
1390 class_export_dump_hook(exp);
1394 void dump_exports(struct obd_device *obd, int locks)
1396 struct obd_export *exp;
1398 spin_lock(&obd->obd_dev_lock);
1399 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1400 print_export_data(exp, "ACTIVE", locks);
1401 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1402 print_export_data(exp, "UNLINKED", locks);
1403 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1404 print_export_data(exp, "DELAYED", locks);
1405 spin_unlock(&obd->obd_dev_lock);
1406 spin_lock(&obd_zombie_impexp_lock);
1407 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1408 print_export_data(exp, "ZOMBIE", locks);
1409 spin_unlock(&obd_zombie_impexp_lock);
1411 EXPORT_SYMBOL(dump_exports);
1413 void obd_exports_barrier(struct obd_device *obd)
1416 LASSERT(list_empty(&obd->obd_exports));
1417 spin_lock(&obd->obd_dev_lock);
1418 while (!list_empty(&obd->obd_unlinked_exports)) {
1419 spin_unlock(&obd->obd_dev_lock);
1420 cfs_schedule_timeout(CFS_TASK_UNINT, cfs_time_seconds(waited));
1421 if (waited > 5 && IS_PO2(waited)) {
1422 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1423 "more than %d seconds. "
1424 "The obd refcount = %d. Is it stuck?\n",
1425 obd->obd_name, waited,
1426 atomic_read(&obd->obd_refcount));
1427 dump_exports(obd, 0);
1430 spin_lock(&obd->obd_dev_lock);
1432 spin_unlock(&obd->obd_dev_lock);
1434 EXPORT_SYMBOL(obd_exports_barrier);
1437 * kill zombie imports and exports
1439 void obd_zombie_impexp_cull(void)
1441 struct obd_import *import;
1442 struct obd_export *export;
1446 spin_lock(&obd_zombie_impexp_lock);
1449 if (!list_empty(&obd_zombie_imports)) {
1450 import = list_entry(obd_zombie_imports.next,
1453 list_del_init(&import->imp_zombie_chain);
1457 if (!list_empty(&obd_zombie_exports)) {
1458 export = list_entry(obd_zombie_exports.next,
1461 list_del_init(&export->exp_obd_chain);
1464 spin_unlock(&obd_zombie_impexp_lock);
1467 class_import_destroy(import);
1470 class_export_destroy(export);
1472 } while (import != NULL || export != NULL);
1476 static struct completion obd_zombie_start;
1477 static struct completion obd_zombie_stop;
1478 static unsigned long obd_zombie_flags;
1479 static cfs_waitq_t obd_zombie_waitq;
1480 static pid_t obd_zombie_pid;
1483 OBD_ZOMBIE_STOP = 1 << 1
1487 * check for work for kill zombie import/export thread.
1489 static int obd_zombie_impexp_check(void *arg)
1493 spin_lock(&obd_zombie_impexp_lock);
1494 rc = list_empty(&obd_zombie_imports) &&
1495 list_empty(&obd_zombie_exports) &&
1496 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1498 spin_unlock(&obd_zombie_impexp_lock);
1504 * Add export to the obd_zombe thread and notify it.
1506 static void obd_zombie_export_add(struct obd_export *exp) {
1507 spin_lock(&exp->exp_obd->obd_dev_lock);
1508 LASSERT(!list_empty(&exp->exp_obd_chain));
1509 list_del_init(&exp->exp_obd_chain);
1510 spin_unlock(&exp->exp_obd->obd_dev_lock);
1511 spin_lock(&obd_zombie_impexp_lock);
1512 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1513 spin_unlock(&obd_zombie_impexp_lock);
1515 if (obd_zombie_impexp_notify != NULL)
1516 obd_zombie_impexp_notify();
1520 * Add import to the obd_zombe thread and notify it.
1522 static void obd_zombie_import_add(struct obd_import *imp) {
1523 LASSERT(imp->imp_sec == NULL);
1524 spin_lock(&obd_zombie_impexp_lock);
1525 LASSERT(list_empty(&imp->imp_zombie_chain));
1526 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1527 spin_unlock(&obd_zombie_impexp_lock);
1529 if (obd_zombie_impexp_notify != NULL)
1530 obd_zombie_impexp_notify();
1534 * notify import/export destroy thread about new zombie.
1536 static void obd_zombie_impexp_notify(void)
1538 cfs_waitq_signal(&obd_zombie_waitq);
1542 * check whether obd_zombie is idle
1544 static int obd_zombie_is_idle(void)
1548 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1549 spin_lock(&obd_zombie_impexp_lock);
1550 rc = list_empty(&obd_zombie_imports) &&
1551 list_empty(&obd_zombie_exports);
1552 spin_unlock(&obd_zombie_impexp_lock);
1557 * wait when obd_zombie import/export queues become empty
1559 void obd_zombie_barrier(void)
1561 struct l_wait_info lwi = { 0 };
1563 if (obd_zombie_pid == cfs_curproc_pid())
1564 /* don't wait for myself */
1566 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1568 EXPORT_SYMBOL(obd_zombie_barrier);
1573 * destroy zombie export/import thread.
1575 static int obd_zombie_impexp_thread(void *unused)
1579 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1580 complete(&obd_zombie_start);
1584 complete(&obd_zombie_start);
1586 obd_zombie_pid = cfs_curproc_pid();
1588 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1589 struct l_wait_info lwi = { 0 };
1591 l_wait_event(obd_zombie_waitq,
1592 !obd_zombie_impexp_check(NULL), &lwi);
1593 obd_zombie_impexp_cull();
1596 * Notify obd_zombie_barrier callers that queues
1599 cfs_waitq_signal(&obd_zombie_waitq);
1602 complete(&obd_zombie_stop);
1607 #else /* ! KERNEL */
1609 static atomic_t zombie_recur = ATOMIC_INIT(0);
1610 static void *obd_zombie_impexp_work_cb;
1611 static void *obd_zombie_impexp_idle_cb;
1613 int obd_zombie_impexp_kill(void *arg)
1617 if (atomic_inc_return(&zombie_recur) == 1) {
1618 obd_zombie_impexp_cull();
1621 atomic_dec(&zombie_recur);
1628 * start destroy zombie import/export thread
1630 int obd_zombie_impexp_init(void)
1634 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1635 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1636 spin_lock_init(&obd_zombie_impexp_lock);
1637 init_completion(&obd_zombie_start);
1638 init_completion(&obd_zombie_stop);
1639 cfs_waitq_init(&obd_zombie_waitq);
1643 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1647 wait_for_completion(&obd_zombie_start);
1650 obd_zombie_impexp_work_cb =
1651 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1652 &obd_zombie_impexp_kill, NULL);
1654 obd_zombie_impexp_idle_cb =
1655 liblustre_register_idle_callback("obd_zombi_impexp_check",
1656 &obd_zombie_impexp_check, NULL);
1662 * stop destroy zombie import/export thread
1664 void obd_zombie_impexp_stop(void)
1666 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1667 obd_zombie_impexp_notify();
1669 wait_for_completion(&obd_zombie_stop);
1671 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1672 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);