4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 cfs_spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
70 * support functions: we could use inter-module communication, but this
71 * is more portable to other OS's
73 static struct obd_device *obd_device_alloc(void)
75 struct obd_device *obd;
77 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79 obd->obd_magic = OBD_DEVICE_MAGIC;
84 static void obd_device_free(struct obd_device *obd)
87 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89 if (obd->obd_namespace != NULL) {
90 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91 obd, obd->obd_namespace, obd->obd_force);
94 lu_ref_fini(&obd->obd_reference);
95 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 struct obd_type *class_search_type(const char *name)
101 struct obd_type *type;
103 cfs_spin_lock(&obd_types_lock);
104 cfs_list_for_each(tmp, &obd_types) {
105 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 cfs_spin_unlock(&obd_types_lock);
111 cfs_spin_unlock(&obd_types_lock);
115 struct obd_type *class_get_type(const char *name)
117 struct obd_type *type = class_search_type(name);
119 #ifdef HAVE_MODULE_LOADING_SUPPORT
121 const char *modname = name;
122 if (!cfs_request_module("%s", modname)) {
123 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124 type = class_search_type(name);
126 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
132 cfs_spin_lock(&type->obd_type_lock);
134 cfs_try_module_get(type->typ_dt_ops->o_owner);
135 cfs_spin_unlock(&type->obd_type_lock);
139 EXPORT_SYMBOL(class_get_type);
141 void class_put_type(struct obd_type *type)
144 cfs_spin_lock(&type->obd_type_lock);
146 cfs_module_put(type->typ_dt_ops->o_owner);
147 cfs_spin_unlock(&type->obd_type_lock);
149 EXPORT_SYMBOL(class_put_type);
151 #define CLASS_MAX_NAME 1024
153 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
154 struct lprocfs_vars *vars, const char *name,
155 struct lu_device_type *ldt)
157 struct obd_type *type;
162 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
164 if (class_search_type(name)) {
165 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
170 OBD_ALLOC(type, sizeof(*type));
174 OBD_ALLOC_PTR(type->typ_dt_ops);
175 OBD_ALLOC_PTR(type->typ_md_ops);
176 OBD_ALLOC(type->typ_name, strlen(name) + 1);
178 if (type->typ_dt_ops == NULL ||
179 type->typ_md_ops == NULL ||
180 type->typ_name == NULL)
183 *(type->typ_dt_ops) = *dt_ops;
184 /* md_ops is optional */
186 *(type->typ_md_ops) = *md_ops;
187 strcpy(type->typ_name, name);
188 cfs_spin_lock_init(&type->obd_type_lock);
191 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
193 if (IS_ERR(type->typ_procroot)) {
194 rc = PTR_ERR(type->typ_procroot);
195 type->typ_procroot = NULL;
201 rc = lu_device_type_init(ldt);
206 cfs_spin_lock(&obd_types_lock);
207 cfs_list_add(&type->typ_chain, &obd_types);
208 cfs_spin_unlock(&obd_types_lock);
213 if (type->typ_name != NULL)
214 OBD_FREE(type->typ_name, strlen(name) + 1);
215 if (type->typ_md_ops != NULL)
216 OBD_FREE_PTR(type->typ_md_ops);
217 if (type->typ_dt_ops != NULL)
218 OBD_FREE_PTR(type->typ_dt_ops);
219 OBD_FREE(type, sizeof(*type));
222 EXPORT_SYMBOL(class_register_type);
224 int class_unregister_type(const char *name)
226 struct obd_type *type = class_search_type(name);
230 CERROR("unknown obd type\n");
234 if (type->typ_refcnt) {
235 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
236 /* This is a bad situation, let's make the best of it */
237 /* Remove ops, but leave the name for debugging */
238 OBD_FREE_PTR(type->typ_dt_ops);
239 OBD_FREE_PTR(type->typ_md_ops);
243 if (type->typ_procroot) {
244 lprocfs_remove(&type->typ_procroot);
248 lu_device_type_fini(type->typ_lu);
250 cfs_spin_lock(&obd_types_lock);
251 cfs_list_del(&type->typ_chain);
252 cfs_spin_unlock(&obd_types_lock);
253 OBD_FREE(type->typ_name, strlen(name) + 1);
254 if (type->typ_dt_ops != NULL)
255 OBD_FREE_PTR(type->typ_dt_ops);
256 if (type->typ_md_ops != NULL)
257 OBD_FREE_PTR(type->typ_md_ops);
258 OBD_FREE(type, sizeof(*type));
260 } /* class_unregister_type */
261 EXPORT_SYMBOL(class_unregister_type);
264 * Create a new obd device.
266 * Find an empty slot in ::obd_devs[], create a new obd device in it.
268 * \param[in] type_name obd device type string.
269 * \param[in] name obd device name.
271 * \retval NULL if create fails, otherwise return the obd device
274 struct obd_device *class_newdev(const char *type_name, const char *name)
276 struct obd_device *result = NULL;
277 struct obd_device *newdev;
278 struct obd_type *type = NULL;
280 int new_obd_minor = 0;
283 if (strlen(name) >= MAX_OBD_NAME) {
284 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
285 RETURN(ERR_PTR(-EINVAL));
288 type = class_get_type(type_name);
290 CERROR("OBD: unknown type: %s\n", type_name);
291 RETURN(ERR_PTR(-ENODEV));
294 newdev = obd_device_alloc();
295 if (newdev == NULL) {
296 class_put_type(type);
297 RETURN(ERR_PTR(-ENOMEM));
299 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
301 cfs_write_lock(&obd_dev_lock);
302 for (i = 0; i < class_devno_max(); i++) {
303 struct obd_device *obd = class_num2obd(i);
305 if (obd && obd->obd_name &&
306 (strcmp(name, obd->obd_name) == 0)) {
307 CERROR("Device %s already exists at %d, won't add\n",
310 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
311 "%p obd_magic %08x != %08x\n", result,
312 result->obd_magic, OBD_DEVICE_MAGIC);
313 LASSERTF(result->obd_minor == new_obd_minor,
314 "%p obd_minor %d != %d\n", result,
315 result->obd_minor, new_obd_minor);
317 obd_devs[result->obd_minor] = NULL;
318 result->obd_name[0]='\0';
320 result = ERR_PTR(-EEXIST);
323 if (!result && !obd) {
325 result->obd_minor = i;
327 result->obd_type = type;
328 strncpy(result->obd_name, name,
329 sizeof(result->obd_name) - 1);
330 obd_devs[i] = result;
333 cfs_write_unlock(&obd_dev_lock);
335 if (result == NULL && i >= class_devno_max()) {
336 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
338 RETURN(ERR_PTR(-EOVERFLOW));
341 if (IS_ERR(result)) {
342 obd_device_free(newdev);
343 class_put_type(type);
345 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
346 result->obd_name, result);
351 void class_release_dev(struct obd_device *obd)
353 struct obd_type *obd_type = obd->obd_type;
355 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
356 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
357 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
358 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
359 LASSERT(obd_type != NULL);
361 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
362 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
364 cfs_write_lock(&obd_dev_lock);
365 obd_devs[obd->obd_minor] = NULL;
366 cfs_write_unlock(&obd_dev_lock);
367 obd_device_free(obd);
369 class_put_type(obd_type);
372 int class_name2dev(const char *name)
379 cfs_read_lock(&obd_dev_lock);
380 for (i = 0; i < class_devno_max(); i++) {
381 struct obd_device *obd = class_num2obd(i);
383 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
384 /* Make sure we finished attaching before we give
385 out any references */
386 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
387 if (obd->obd_attached) {
388 cfs_read_unlock(&obd_dev_lock);
394 cfs_read_unlock(&obd_dev_lock);
398 EXPORT_SYMBOL(class_name2dev);
400 struct obd_device *class_name2obd(const char *name)
402 int dev = class_name2dev(name);
404 if (dev < 0 || dev > class_devno_max())
406 return class_num2obd(dev);
408 EXPORT_SYMBOL(class_name2obd);
410 int class_uuid2dev(struct obd_uuid *uuid)
414 cfs_read_lock(&obd_dev_lock);
415 for (i = 0; i < class_devno_max(); i++) {
416 struct obd_device *obd = class_num2obd(i);
418 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
419 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
420 cfs_read_unlock(&obd_dev_lock);
424 cfs_read_unlock(&obd_dev_lock);
428 EXPORT_SYMBOL(class_uuid2dev);
430 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
432 int dev = class_uuid2dev(uuid);
435 return class_num2obd(dev);
437 EXPORT_SYMBOL(class_uuid2obd);
440 * Get obd device from ::obd_devs[]
442 * \param num [in] array index
444 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
445 * otherwise return the obd device there.
447 struct obd_device *class_num2obd(int num)
449 struct obd_device *obd = NULL;
451 if (num < class_devno_max()) {
456 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
457 "%p obd_magic %08x != %08x\n",
458 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
459 LASSERTF(obd->obd_minor == num,
460 "%p obd_minor %0d != %0d\n",
461 obd, obd->obd_minor, num);
466 EXPORT_SYMBOL(class_num2obd);
468 void class_obd_list(void)
473 cfs_read_lock(&obd_dev_lock);
474 for (i = 0; i < class_devno_max(); i++) {
475 struct obd_device *obd = class_num2obd(i);
479 if (obd->obd_stopping)
481 else if (obd->obd_set_up)
483 else if (obd->obd_attached)
487 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
488 i, status, obd->obd_type->typ_name,
489 obd->obd_name, obd->obd_uuid.uuid,
490 cfs_atomic_read(&obd->obd_refcount));
492 cfs_read_unlock(&obd_dev_lock);
496 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
497 specified, then only the client with that uuid is returned,
498 otherwise any client connected to the tgt is returned. */
499 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
500 const char * typ_name,
501 struct obd_uuid *grp_uuid)
505 cfs_read_lock(&obd_dev_lock);
506 for (i = 0; i < class_devno_max(); i++) {
507 struct obd_device *obd = class_num2obd(i);
511 if ((strncmp(obd->obd_type->typ_name, typ_name,
512 strlen(typ_name)) == 0)) {
513 if (obd_uuid_equals(tgt_uuid,
514 &obd->u.cli.cl_target_uuid) &&
515 ((grp_uuid)? obd_uuid_equals(grp_uuid,
516 &obd->obd_uuid) : 1)) {
517 cfs_read_unlock(&obd_dev_lock);
522 cfs_read_unlock(&obd_dev_lock);
526 EXPORT_SYMBOL(class_find_client_obd);
528 /* Iterate the obd_device list looking devices have grp_uuid. Start
529 searching at *next, and if a device is found, the next index to look
530 at is saved in *next. If next is NULL, then the first matching device
531 will always be returned. */
532 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
538 else if (*next >= 0 && *next < class_devno_max())
543 cfs_read_lock(&obd_dev_lock);
544 for (; i < class_devno_max(); i++) {
545 struct obd_device *obd = class_num2obd(i);
549 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
552 cfs_read_unlock(&obd_dev_lock);
556 cfs_read_unlock(&obd_dev_lock);
560 EXPORT_SYMBOL(class_devices_in_group);
563 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
564 * adjust sptlrpc settings accordingly.
566 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
568 struct obd_device *obd;
572 LASSERT(namelen > 0);
574 cfs_read_lock(&obd_dev_lock);
575 for (i = 0; i < class_devno_max(); i++) {
576 obd = class_num2obd(i);
578 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
581 /* only notify mdc, osc, mdt, ost */
582 type = obd->obd_type->typ_name;
583 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
584 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
585 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
586 strcmp(type, LUSTRE_OST_NAME) != 0)
589 if (strncmp(obd->obd_name, fsname, namelen))
592 class_incref(obd, __FUNCTION__, obd);
593 cfs_read_unlock(&obd_dev_lock);
594 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
595 sizeof(KEY_SPTLRPC_CONF),
596 KEY_SPTLRPC_CONF, 0, NULL, NULL);
598 class_decref(obd, __FUNCTION__, obd);
599 cfs_read_lock(&obd_dev_lock);
601 cfs_read_unlock(&obd_dev_lock);
604 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
606 void obd_cleanup_caches(void)
611 if (obd_device_cachep) {
612 rc = cfs_mem_cache_destroy(obd_device_cachep);
613 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
614 obd_device_cachep = NULL;
617 rc = cfs_mem_cache_destroy(obdo_cachep);
618 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
622 rc = cfs_mem_cache_destroy(import_cachep);
623 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
624 import_cachep = NULL;
627 rc = cfs_mem_cache_destroy(capa_cachep);
628 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
634 int obd_init_caches(void)
638 LASSERT(obd_device_cachep == NULL);
639 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
640 sizeof(struct obd_device),
642 if (!obd_device_cachep)
645 LASSERT(obdo_cachep == NULL);
646 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
651 LASSERT(import_cachep == NULL);
652 import_cachep = cfs_mem_cache_create("ll_import_cache",
653 sizeof(struct obd_import),
658 LASSERT(capa_cachep == NULL);
659 capa_cachep = cfs_mem_cache_create("capa_cache",
660 sizeof(struct obd_capa), 0, 0);
666 obd_cleanup_caches();
671 /* map connection to client */
672 struct obd_export *class_conn2export(struct lustre_handle *conn)
674 struct obd_export *export;
678 CDEBUG(D_CACHE, "looking for null handle\n");
682 if (conn->cookie == -1) { /* this means assign a new connection */
683 CDEBUG(D_CACHE, "want a new connection\n");
687 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
688 export = class_handle2object(conn->cookie);
691 EXPORT_SYMBOL(class_conn2export);
693 struct obd_device *class_exp2obd(struct obd_export *exp)
699 EXPORT_SYMBOL(class_exp2obd);
701 struct obd_device *class_conn2obd(struct lustre_handle *conn)
703 struct obd_export *export;
704 export = class_conn2export(conn);
706 struct obd_device *obd = export->exp_obd;
707 class_export_put(export);
712 EXPORT_SYMBOL(class_conn2obd);
714 struct obd_import *class_exp2cliimp(struct obd_export *exp)
716 struct obd_device *obd = exp->exp_obd;
719 return obd->u.cli.cl_import;
721 EXPORT_SYMBOL(class_exp2cliimp);
723 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
725 struct obd_device *obd = class_conn2obd(conn);
728 return obd->u.cli.cl_import;
730 EXPORT_SYMBOL(class_conn2cliimp);
732 /* Export management functions */
734 /* if export is involved in recovery then clean up related things */
735 void class_export_recovery_cleanup(struct obd_export *exp)
737 struct obd_device *obd = exp->exp_obd;
739 cfs_spin_lock(&obd->obd_recovery_task_lock);
740 if (exp->exp_delayed)
741 obd->obd_delayed_clients--;
742 if (obd->obd_recovering && exp->exp_in_recovery) {
743 cfs_spin_lock(&exp->exp_lock);
744 exp->exp_in_recovery = 0;
745 cfs_spin_unlock(&exp->exp_lock);
746 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
747 cfs_atomic_dec(&obd->obd_connected_clients);
749 cfs_spin_unlock(&obd->obd_recovery_task_lock);
750 /** Cleanup req replay fields */
751 if (exp->exp_req_replay_needed) {
752 cfs_spin_lock(&exp->exp_lock);
753 exp->exp_req_replay_needed = 0;
754 cfs_spin_unlock(&exp->exp_lock);
755 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
756 cfs_atomic_dec(&obd->obd_req_replay_clients);
758 /** Cleanup lock replay data */
759 if (exp->exp_lock_replay_needed) {
760 cfs_spin_lock(&exp->exp_lock);
761 exp->exp_lock_replay_needed = 0;
762 cfs_spin_unlock(&exp->exp_lock);
763 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
764 cfs_atomic_dec(&obd->obd_lock_replay_clients);
768 static void class_export_destroy(struct obd_export *exp)
770 struct obd_device *obd = exp->exp_obd;
773 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
774 LASSERT(obd != NULL);
776 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
777 exp->exp_client_uuid.uuid, obd->obd_name);
780 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
781 if (exp->exp_connection)
782 ptlrpc_put_connection_superhack(exp->exp_connection);
784 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
785 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
786 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
787 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
788 obd_destroy_export(exp);
789 class_decref(obd, "export", exp);
791 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
795 static void export_handle_addref(void *export)
797 class_export_get(export);
800 static struct portals_handle_ops export_handle_ops = {
801 .hop_addref = export_handle_addref,
805 struct obd_export *class_export_get(struct obd_export *exp)
807 cfs_atomic_inc(&exp->exp_refcount);
808 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
809 cfs_atomic_read(&exp->exp_refcount));
812 EXPORT_SYMBOL(class_export_get);
814 void class_export_put(struct obd_export *exp)
816 LASSERT(exp != NULL);
817 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
818 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
819 cfs_atomic_read(&exp->exp_refcount) - 1);
821 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
822 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
823 CDEBUG(D_IOCTL, "final put %p/%s\n",
824 exp, exp->exp_client_uuid.uuid);
826 /* release nid stat refererence */
827 lprocfs_exp_cleanup(exp);
828 class_export_recovery_cleanup(exp);
830 obd_zombie_export_add(exp);
833 EXPORT_SYMBOL(class_export_put);
835 /* Creates a new export, adds it to the hash table, and returns a
836 * pointer to it. The refcount is 2: one for the hash reference, and
837 * one for the pointer returned by this function. */
838 struct obd_export *class_new_export(struct obd_device *obd,
839 struct obd_uuid *cluuid)
841 struct obd_export *export;
842 cfs_hash_t *hash = NULL;
846 OBD_ALLOC_PTR(export);
848 return ERR_PTR(-ENOMEM);
850 export->exp_conn_cnt = 0;
851 export->exp_lock_hash = NULL;
852 export->exp_flock_hash = NULL;
853 cfs_atomic_set(&export->exp_refcount, 2);
854 cfs_atomic_set(&export->exp_rpc_count, 0);
855 cfs_atomic_set(&export->exp_cb_count, 0);
856 cfs_atomic_set(&export->exp_locks_count, 0);
857 #if LUSTRE_TRACKS_LOCK_EXP_REFS
858 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
859 cfs_spin_lock_init(&export->exp_locks_list_guard);
861 cfs_atomic_set(&export->exp_replay_count, 0);
862 export->exp_obd = obd;
863 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
864 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
865 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
866 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
867 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
868 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
869 class_handle_hash(&export->exp_handle, &export_handle_ops);
870 export->exp_last_request_time = cfs_time_current_sec();
871 cfs_spin_lock_init(&export->exp_lock);
872 cfs_spin_lock_init(&export->exp_rpc_lock);
873 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
874 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
875 cfs_spin_lock_init(&export->exp_bl_list_lock);
876 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
878 export->exp_sp_peer = LUSTRE_SP_ANY;
879 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
880 export->exp_client_uuid = *cluuid;
881 obd_init_export(export);
883 cfs_spin_lock(&obd->obd_dev_lock);
884 /* shouldn't happen, but might race */
885 if (obd->obd_stopping)
886 GOTO(exit_unlock, rc = -ENODEV);
888 hash = cfs_hash_getref(obd->obd_uuid_hash);
890 GOTO(exit_unlock, rc = -ENODEV);
891 cfs_spin_unlock(&obd->obd_dev_lock);
893 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
894 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
896 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
897 obd->obd_name, cluuid->uuid, rc);
898 GOTO(exit_err, rc = -EALREADY);
902 cfs_spin_lock(&obd->obd_dev_lock);
903 if (obd->obd_stopping) {
904 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
905 GOTO(exit_unlock, rc = -ENODEV);
908 class_incref(obd, "export", export);
909 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
910 cfs_list_add_tail(&export->exp_obd_chain_timed,
911 &export->exp_obd->obd_exports_timed);
912 export->exp_obd->obd_num_exports++;
913 cfs_spin_unlock(&obd->obd_dev_lock);
914 cfs_hash_putref(hash);
918 cfs_spin_unlock(&obd->obd_dev_lock);
921 cfs_hash_putref(hash);
922 class_handle_unhash(&export->exp_handle);
923 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
924 obd_destroy_export(export);
925 OBD_FREE_PTR(export);
928 EXPORT_SYMBOL(class_new_export);
930 void class_unlink_export(struct obd_export *exp)
932 class_handle_unhash(&exp->exp_handle);
934 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
935 /* delete an uuid-export hashitem from hashtables */
936 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
937 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
938 &exp->exp_client_uuid,
939 &exp->exp_uuid_hash);
941 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
942 cfs_list_del_init(&exp->exp_obd_chain_timed);
943 exp->exp_obd->obd_num_exports--;
944 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
945 class_export_put(exp);
947 EXPORT_SYMBOL(class_unlink_export);
949 /* Import management functions */
950 void class_import_destroy(struct obd_import *imp)
954 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
955 imp->imp_obd->obd_name);
957 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
959 ptlrpc_put_connection_superhack(imp->imp_connection);
961 while (!cfs_list_empty(&imp->imp_conn_list)) {
962 struct obd_import_conn *imp_conn;
964 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
965 struct obd_import_conn, oic_item);
966 cfs_list_del_init(&imp_conn->oic_item);
967 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
968 OBD_FREE(imp_conn, sizeof(*imp_conn));
971 LASSERT(imp->imp_sec == NULL);
972 class_decref(imp->imp_obd, "import", imp);
973 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
977 static void import_handle_addref(void *import)
979 class_import_get(import);
982 static struct portals_handle_ops import_handle_ops = {
983 .hop_addref = import_handle_addref,
987 struct obd_import *class_import_get(struct obd_import *import)
989 cfs_atomic_inc(&import->imp_refcount);
990 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
991 cfs_atomic_read(&import->imp_refcount),
992 import->imp_obd->obd_name);
995 EXPORT_SYMBOL(class_import_get);
997 void class_import_put(struct obd_import *imp)
1001 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1002 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1004 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1005 cfs_atomic_read(&imp->imp_refcount) - 1,
1006 imp->imp_obd->obd_name);
1008 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1009 CDEBUG(D_INFO, "final put import %p\n", imp);
1010 obd_zombie_import_add(imp);
1015 EXPORT_SYMBOL(class_import_put);
1017 static void init_imp_at(struct imp_at *at) {
1019 at_init(&at->iat_net_latency, 0, 0);
1020 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1021 /* max service estimates are tracked on the server side, so
1022 don't use the AT history here, just use the last reported
1023 val. (But keep hist for proc histogram, worst_ever) */
1024 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1029 struct obd_import *class_new_import(struct obd_device *obd)
1031 struct obd_import *imp;
1033 OBD_ALLOC(imp, sizeof(*imp));
1037 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1038 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1039 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1040 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1041 cfs_spin_lock_init(&imp->imp_lock);
1042 imp->imp_last_success_conn = 0;
1043 imp->imp_state = LUSTRE_IMP_NEW;
1044 imp->imp_obd = class_incref(obd, "import", imp);
1045 cfs_mutex_init(&imp->imp_sec_mutex);
1046 cfs_waitq_init(&imp->imp_recovery_waitq);
1048 cfs_atomic_set(&imp->imp_refcount, 2);
1049 cfs_atomic_set(&imp->imp_unregistering, 0);
1050 cfs_atomic_set(&imp->imp_inflight, 0);
1051 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1052 cfs_atomic_set(&imp->imp_inval_count, 0);
1053 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1054 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1055 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1056 init_imp_at(&imp->imp_at);
1058 /* the default magic is V2, will be used in connect RPC, and
1059 * then adjusted according to the flags in request/reply. */
1060 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1064 EXPORT_SYMBOL(class_new_import);
1066 void class_destroy_import(struct obd_import *import)
1068 LASSERT(import != NULL);
1069 LASSERT(import != LP_POISON);
1071 class_handle_unhash(&import->imp_handle);
1073 cfs_spin_lock(&import->imp_lock);
1074 import->imp_generation++;
1075 cfs_spin_unlock(&import->imp_lock);
1076 class_import_put(import);
1078 EXPORT_SYMBOL(class_destroy_import);
1080 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1082 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1084 cfs_spin_lock(&exp->exp_locks_list_guard);
1086 LASSERT(lock->l_exp_refs_nr >= 0);
1088 if (lock->l_exp_refs_target != NULL &&
1089 lock->l_exp_refs_target != exp) {
1090 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1091 exp, lock, lock->l_exp_refs_target);
1093 if ((lock->l_exp_refs_nr ++) == 0) {
1094 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1095 lock->l_exp_refs_target = exp;
1097 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1098 lock, exp, lock->l_exp_refs_nr);
1099 cfs_spin_unlock(&exp->exp_locks_list_guard);
1101 EXPORT_SYMBOL(__class_export_add_lock_ref);
1103 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1105 cfs_spin_lock(&exp->exp_locks_list_guard);
1106 LASSERT(lock->l_exp_refs_nr > 0);
1107 if (lock->l_exp_refs_target != exp) {
1108 LCONSOLE_WARN("lock %p, "
1109 "mismatching export pointers: %p, %p\n",
1110 lock, lock->l_exp_refs_target, exp);
1112 if (-- lock->l_exp_refs_nr == 0) {
1113 cfs_list_del_init(&lock->l_exp_refs_link);
1114 lock->l_exp_refs_target = NULL;
1116 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1117 lock, exp, lock->l_exp_refs_nr);
1118 cfs_spin_unlock(&exp->exp_locks_list_guard);
1120 EXPORT_SYMBOL(__class_export_del_lock_ref);
1123 /* A connection defines an export context in which preallocation can
1124 be managed. This releases the export pointer reference, and returns
1125 the export handle, so the export refcount is 1 when this function
1127 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1128 struct obd_uuid *cluuid)
1130 struct obd_export *export;
1131 LASSERT(conn != NULL);
1132 LASSERT(obd != NULL);
1133 LASSERT(cluuid != NULL);
1136 export = class_new_export(obd, cluuid);
1138 RETURN(PTR_ERR(export));
1140 conn->cookie = export->exp_handle.h_cookie;
1141 class_export_put(export);
1143 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1144 cluuid->uuid, conn->cookie);
1147 EXPORT_SYMBOL(class_connect);
1150 /* This function removes 1-3 references from the export:
1151 * 1 - for export pointer passed
1152 * and if disconnect really need
1153 * 2 - removing from hash
1154 * 3 - in client_unlink_export
1155 * The export pointer passed to this function can destroyed */
1156 int class_disconnect(struct obd_export *export)
1158 int already_disconnected;
1161 if (export == NULL) {
1162 CWARN("attempting to free NULL export %p\n", export);
1166 cfs_spin_lock(&export->exp_lock);
1167 already_disconnected = export->exp_disconnected;
1168 export->exp_disconnected = 1;
1169 cfs_spin_unlock(&export->exp_lock);
1171 /* class_cleanup(), abort_recovery(), and class_fail_export()
1172 * all end up in here, and if any of them race we shouldn't
1173 * call extra class_export_puts(). */
1174 if (already_disconnected) {
1175 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1176 GOTO(no_disconn, already_disconnected);
1179 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1180 export->exp_handle.h_cookie);
1182 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1183 cfs_hash_del(export->exp_obd->obd_nid_hash,
1184 &export->exp_connection->c_peer.nid,
1185 &export->exp_nid_hash);
1187 class_unlink_export(export);
1189 class_export_put(export);
1192 EXPORT_SYMBOL(class_disconnect);
1194 /* Return non-zero for a fully connected export */
1195 int class_connected_export(struct obd_export *exp)
1199 cfs_spin_lock(&exp->exp_lock);
1200 connected = (exp->exp_conn_cnt > 0);
1201 cfs_spin_unlock(&exp->exp_lock);
1206 EXPORT_SYMBOL(class_connected_export);
1208 static void class_disconnect_export_list(cfs_list_t *list,
1209 enum obd_option flags)
1212 struct obd_export *exp;
1215 /* It's possible that an export may disconnect itself, but
1216 * nothing else will be added to this list. */
1217 while (!cfs_list_empty(list)) {
1218 exp = cfs_list_entry(list->next, struct obd_export,
1220 /* need for safe call CDEBUG after obd_disconnect */
1221 class_export_get(exp);
1223 cfs_spin_lock(&exp->exp_lock);
1224 exp->exp_flags = flags;
1225 cfs_spin_unlock(&exp->exp_lock);
1227 if (obd_uuid_equals(&exp->exp_client_uuid,
1228 &exp->exp_obd->obd_uuid)) {
1230 "exp %p export uuid == obd uuid, don't discon\n",
1232 /* Need to delete this now so we don't end up pointing
1233 * to work_list later when this export is cleaned up. */
1234 cfs_list_del_init(&exp->exp_obd_chain);
1235 class_export_put(exp);
1239 class_export_get(exp);
1240 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1241 "last request at "CFS_TIME_T"\n",
1242 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1243 exp, exp->exp_last_request_time);
1244 /* release one export reference anyway */
1245 rc = obd_disconnect(exp);
1247 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1248 obd_export_nid2str(exp), exp, rc);
1249 class_export_put(exp);
1254 void class_disconnect_exports(struct obd_device *obd)
1256 cfs_list_t work_list;
1259 /* Move all of the exports from obd_exports to a work list, en masse. */
1260 CFS_INIT_LIST_HEAD(&work_list);
1261 cfs_spin_lock(&obd->obd_dev_lock);
1262 cfs_list_splice_init(&obd->obd_exports, &work_list);
1263 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1264 cfs_spin_unlock(&obd->obd_dev_lock);
1266 if (!cfs_list_empty(&work_list)) {
1267 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1268 "disconnecting them\n", obd->obd_minor, obd);
1269 class_disconnect_export_list(&work_list,
1270 exp_flags_from_obd(obd));
1272 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1273 obd->obd_minor, obd);
1276 EXPORT_SYMBOL(class_disconnect_exports);
1278 /* Remove exports that have not completed recovery.
1280 void class_disconnect_stale_exports(struct obd_device *obd,
1281 int (*test_export)(struct obd_export *))
1283 cfs_list_t work_list;
1284 cfs_list_t *pos, *n;
1285 struct obd_export *exp;
1289 CFS_INIT_LIST_HEAD(&work_list);
1290 cfs_spin_lock(&obd->obd_dev_lock);
1291 cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1294 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1296 /* don't count self-export as client */
1297 if (obd_uuid_equals(&exp->exp_client_uuid,
1298 &exp->exp_obd->obd_uuid))
1301 if (test_export(exp))
1304 cfs_spin_lock(&exp->exp_lock);
1305 failed = exp->exp_failed;
1306 exp->exp_failed = 1;
1307 cfs_spin_unlock(&exp->exp_lock);
1311 cfs_list_move(&exp->exp_obd_chain, &work_list);
1313 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1314 obd->obd_name, exp->exp_client_uuid.uuid,
1315 exp->exp_connection == NULL ? "<unknown>" :
1316 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1317 print_export_data(exp, "EVICTING", 0);
1319 cfs_spin_unlock(&obd->obd_dev_lock);
1322 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1323 obd->obd_name, evicted);
1324 obd->obd_stale_clients += evicted;
1326 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1327 OBD_OPT_ABORT_RECOV);
1330 EXPORT_SYMBOL(class_disconnect_stale_exports);
1332 void class_fail_export(struct obd_export *exp)
1334 int rc, already_failed;
1336 cfs_spin_lock(&exp->exp_lock);
1337 already_failed = exp->exp_failed;
1338 exp->exp_failed = 1;
1339 cfs_spin_unlock(&exp->exp_lock);
1341 if (already_failed) {
1342 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1343 exp, exp->exp_client_uuid.uuid);
1347 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1348 exp, exp->exp_client_uuid.uuid);
1350 if (obd_dump_on_timeout)
1351 libcfs_debug_dumplog();
1353 /* need for safe call CDEBUG after obd_disconnect */
1354 class_export_get(exp);
1356 /* Most callers into obd_disconnect are removing their own reference
1357 * (request, for example) in addition to the one from the hash table.
1358 * We don't have such a reference here, so make one. */
1359 class_export_get(exp);
1360 rc = obd_disconnect(exp);
1362 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1364 CDEBUG(D_HA, "disconnected export %p/%s\n",
1365 exp, exp->exp_client_uuid.uuid);
1366 class_export_put(exp);
1368 EXPORT_SYMBOL(class_fail_export);
1370 char *obd_export_nid2str(struct obd_export *exp)
1372 if (exp->exp_connection != NULL)
1373 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1377 EXPORT_SYMBOL(obd_export_nid2str);
1379 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1381 struct obd_export *doomed_exp = NULL;
1382 int exports_evicted = 0;
1384 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1387 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1388 if (doomed_exp == NULL)
1391 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1392 "nid %s found, wanted nid %s, requested nid %s\n",
1393 obd_export_nid2str(doomed_exp),
1394 libcfs_nid2str(nid_key), nid);
1395 LASSERTF(doomed_exp != obd->obd_self_export,
1396 "self-export is hashed by NID?\n");
1398 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1399 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1401 class_fail_export(doomed_exp);
1402 class_export_put(doomed_exp);
1405 if (!exports_evicted)
1406 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1407 obd->obd_name, nid);
1408 return exports_evicted;
1410 EXPORT_SYMBOL(obd_export_evict_by_nid);
1412 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1414 struct obd_export *doomed_exp = NULL;
1415 struct obd_uuid doomed_uuid;
1416 int exports_evicted = 0;
1418 obd_str2uuid(&doomed_uuid, uuid);
1419 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1420 CERROR("%s: can't evict myself\n", obd->obd_name);
1421 return exports_evicted;
1424 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1426 if (doomed_exp == NULL) {
1427 CERROR("%s: can't disconnect %s: no exports found\n",
1428 obd->obd_name, uuid);
1430 CWARN("%s: evicting %s at adminstrative request\n",
1431 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1432 class_fail_export(doomed_exp);
1433 class_export_put(doomed_exp);
1437 return exports_evicted;
1439 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1441 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1442 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1443 EXPORT_SYMBOL(class_export_dump_hook);
1446 static void print_export_data(struct obd_export *exp, const char *status,
1449 struct ptlrpc_reply_state *rs;
1450 struct ptlrpc_reply_state *first_reply = NULL;
1453 cfs_spin_lock(&exp->exp_lock);
1454 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1460 cfs_spin_unlock(&exp->exp_lock);
1462 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1463 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1464 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1465 cfs_atomic_read(&exp->exp_rpc_count),
1466 cfs_atomic_read(&exp->exp_cb_count),
1467 cfs_atomic_read(&exp->exp_locks_count),
1468 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1469 nreplies, first_reply, nreplies > 3 ? "..." : "",
1470 exp->exp_last_committed);
1471 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1472 if (locks && class_export_dump_hook != NULL)
1473 class_export_dump_hook(exp);
1477 void dump_exports(struct obd_device *obd, int locks)
1479 struct obd_export *exp;
1481 cfs_spin_lock(&obd->obd_dev_lock);
1482 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1483 print_export_data(exp, "ACTIVE", locks);
1484 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1485 print_export_data(exp, "UNLINKED", locks);
1486 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1487 print_export_data(exp, "DELAYED", locks);
1488 cfs_spin_unlock(&obd->obd_dev_lock);
1489 cfs_spin_lock(&obd_zombie_impexp_lock);
1490 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1491 print_export_data(exp, "ZOMBIE", locks);
1492 cfs_spin_unlock(&obd_zombie_impexp_lock);
1494 EXPORT_SYMBOL(dump_exports);
1496 void obd_exports_barrier(struct obd_device *obd)
1499 LASSERT(cfs_list_empty(&obd->obd_exports));
1500 cfs_spin_lock(&obd->obd_dev_lock);
1501 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1502 cfs_spin_unlock(&obd->obd_dev_lock);
1503 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1504 cfs_time_seconds(waited));
1505 if (waited > 5 && IS_PO2(waited)) {
1506 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1507 "more than %d seconds. "
1508 "The obd refcount = %d. Is it stuck?\n",
1509 obd->obd_name, waited,
1510 cfs_atomic_read(&obd->obd_refcount));
1511 dump_exports(obd, 1);
1514 cfs_spin_lock(&obd->obd_dev_lock);
1516 cfs_spin_unlock(&obd->obd_dev_lock);
1518 EXPORT_SYMBOL(obd_exports_barrier);
1520 /* Total amount of zombies to be destroyed */
1521 static int zombies_count = 0;
1524 * kill zombie imports and exports
1526 void obd_zombie_impexp_cull(void)
1528 struct obd_import *import;
1529 struct obd_export *export;
1533 cfs_spin_lock(&obd_zombie_impexp_lock);
1536 if (!cfs_list_empty(&obd_zombie_imports)) {
1537 import = cfs_list_entry(obd_zombie_imports.next,
1540 cfs_list_del_init(&import->imp_zombie_chain);
1544 if (!cfs_list_empty(&obd_zombie_exports)) {
1545 export = cfs_list_entry(obd_zombie_exports.next,
1548 cfs_list_del_init(&export->exp_obd_chain);
1551 cfs_spin_unlock(&obd_zombie_impexp_lock);
1553 if (import != NULL) {
1554 class_import_destroy(import);
1555 cfs_spin_lock(&obd_zombie_impexp_lock);
1557 cfs_spin_unlock(&obd_zombie_impexp_lock);
1560 if (export != NULL) {
1561 class_export_destroy(export);
1562 cfs_spin_lock(&obd_zombie_impexp_lock);
1564 cfs_spin_unlock(&obd_zombie_impexp_lock);
1568 } while (import != NULL || export != NULL);
1572 static cfs_completion_t obd_zombie_start;
1573 static cfs_completion_t obd_zombie_stop;
1574 static unsigned long obd_zombie_flags;
1575 static cfs_waitq_t obd_zombie_waitq;
1576 static pid_t obd_zombie_pid;
1579 OBD_ZOMBIE_STOP = 1 << 1
1583 * check for work for kill zombie import/export thread.
1585 static int obd_zombie_impexp_check(void *arg)
1589 cfs_spin_lock(&obd_zombie_impexp_lock);
1590 rc = (zombies_count == 0) &&
1591 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1592 cfs_spin_unlock(&obd_zombie_impexp_lock);
1598 * Add export to the obd_zombe thread and notify it.
1600 static void obd_zombie_export_add(struct obd_export *exp) {
1601 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1602 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1603 cfs_list_del_init(&exp->exp_obd_chain);
1604 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1605 cfs_spin_lock(&obd_zombie_impexp_lock);
1607 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1608 cfs_spin_unlock(&obd_zombie_impexp_lock);
1610 obd_zombie_impexp_notify();
1614 * Add import to the obd_zombe thread and notify it.
1616 static void obd_zombie_import_add(struct obd_import *imp) {
1617 LASSERT(imp->imp_sec == NULL);
1618 LASSERT(imp->imp_rq_pool == NULL);
1619 cfs_spin_lock(&obd_zombie_impexp_lock);
1620 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1622 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1623 cfs_spin_unlock(&obd_zombie_impexp_lock);
1625 obd_zombie_impexp_notify();
1629 * notify import/export destroy thread about new zombie.
1631 static void obd_zombie_impexp_notify(void)
1634 * Make sure obd_zomebie_impexp_thread get this notification.
1635 * It is possible this signal only get by obd_zombie_barrier, and
1636 * barrier gulps this notification and sleeps away and hangs ensues
1638 cfs_waitq_broadcast(&obd_zombie_waitq);
1642 * check whether obd_zombie is idle
1644 static int obd_zombie_is_idle(void)
1648 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1649 cfs_spin_lock(&obd_zombie_impexp_lock);
1650 rc = (zombies_count == 0);
1651 cfs_spin_unlock(&obd_zombie_impexp_lock);
1656 * wait when obd_zombie import/export queues become empty
1658 void obd_zombie_barrier(void)
1660 struct l_wait_info lwi = { 0 };
1662 if (obd_zombie_pid == cfs_curproc_pid())
1663 /* don't wait for myself */
1665 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1667 EXPORT_SYMBOL(obd_zombie_barrier);
1672 * destroy zombie export/import thread.
1674 static int obd_zombie_impexp_thread(void *unused)
1678 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1679 cfs_complete(&obd_zombie_start);
1683 cfs_complete(&obd_zombie_start);
1685 obd_zombie_pid = cfs_curproc_pid();
1687 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1688 struct l_wait_info lwi = { 0 };
1690 l_wait_event(obd_zombie_waitq,
1691 !obd_zombie_impexp_check(NULL), &lwi);
1692 obd_zombie_impexp_cull();
1695 * Notify obd_zombie_barrier callers that queues
1698 cfs_waitq_signal(&obd_zombie_waitq);
1701 cfs_complete(&obd_zombie_stop);
1706 #else /* ! KERNEL */
1708 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1709 static void *obd_zombie_impexp_work_cb;
1710 static void *obd_zombie_impexp_idle_cb;
1712 int obd_zombie_impexp_kill(void *arg)
1716 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1717 obd_zombie_impexp_cull();
1720 cfs_atomic_dec(&zombie_recur);
1727 * start destroy zombie import/export thread
1729 int obd_zombie_impexp_init(void)
1733 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1734 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1735 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1736 cfs_init_completion(&obd_zombie_start);
1737 cfs_init_completion(&obd_zombie_stop);
1738 cfs_waitq_init(&obd_zombie_waitq);
1742 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1746 cfs_wait_for_completion(&obd_zombie_start);
1749 obd_zombie_impexp_work_cb =
1750 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1751 &obd_zombie_impexp_kill, NULL);
1753 obd_zombie_impexp_idle_cb =
1754 liblustre_register_idle_callback("obd_zombi_impexp_check",
1755 &obd_zombie_impexp_check, NULL);
1761 * stop destroy zombie import/export thread
1763 void obd_zombie_impexp_stop(void)
1765 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1766 obd_zombie_impexp_notify();
1768 cfs_wait_for_completion(&obd_zombie_stop);
1770 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1771 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1775 /***** Kernel-userspace comm helpers *******/
1777 /* Get length of entire message, including header */
1778 int kuc_len(int payload_len)
1780 return sizeof(struct kuc_hdr) + payload_len;
1782 EXPORT_SYMBOL(kuc_len);
1784 /* Get a pointer to kuc header, given a ptr to the payload
1785 * @param p Pointer to payload area
1786 * @returns Pointer to kuc header
1788 struct kuc_hdr * kuc_ptr(void *p)
1790 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1791 LASSERT(lh->kuc_magic == KUC_MAGIC);
1794 EXPORT_SYMBOL(kuc_ptr);
1796 /* Test if payload is part of kuc message
1797 * @param p Pointer to payload area
1800 int kuc_ispayload(void *p)
1802 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1804 if (kh->kuc_magic == KUC_MAGIC)
1809 EXPORT_SYMBOL(kuc_ispayload);
1811 /* Alloc space for a message, and fill in header
1812 * @return Pointer to payload area
1814 void *kuc_alloc(int payload_len, int transport, int type)
1817 int len = kuc_len(payload_len);
1821 return ERR_PTR(-ENOMEM);
1823 lh->kuc_magic = KUC_MAGIC;
1824 lh->kuc_transport = transport;
1825 lh->kuc_msgtype = type;
1826 lh->kuc_msglen = len;
1828 return (void *)(lh + 1);
1830 EXPORT_SYMBOL(kuc_alloc);
1832 /* Takes pointer to payload area */
1833 inline void kuc_free(void *p, int payload_len)
1835 struct kuc_hdr *lh = kuc_ptr(p);
1836 OBD_FREE(lh, kuc_len(payload_len));
1838 EXPORT_SYMBOL(kuc_free);