4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 cfs_spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 cfs_spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 cfs_spin_unlock(&obd_types_lock);
112 cfs_spin_unlock(&obd_types_lock);
116 struct obd_type *class_get_type(const char *name)
118 struct obd_type *type = class_search_type(name);
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
122 const char *modname = name;
123 if (!cfs_request_module("%s", modname)) {
124 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
125 type = class_search_type(name);
127 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
133 cfs_spin_lock(&type->obd_type_lock);
135 cfs_try_module_get(type->typ_dt_ops->o_owner);
136 cfs_spin_unlock(&type->obd_type_lock);
140 EXPORT_SYMBOL(class_get_type);
142 void class_put_type(struct obd_type *type)
145 cfs_spin_lock(&type->obd_type_lock);
147 cfs_module_put(type->typ_dt_ops->o_owner);
148 cfs_spin_unlock(&type->obd_type_lock);
150 EXPORT_SYMBOL(class_put_type);
152 #define CLASS_MAX_NAME 1024
154 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
155 struct lprocfs_vars *vars, const char *name,
156 struct lu_device_type *ldt)
158 struct obd_type *type;
163 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
165 if (class_search_type(name)) {
166 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
171 OBD_ALLOC(type, sizeof(*type));
175 OBD_ALLOC_PTR(type->typ_dt_ops);
176 OBD_ALLOC_PTR(type->typ_md_ops);
177 OBD_ALLOC(type->typ_name, strlen(name) + 1);
179 if (type->typ_dt_ops == NULL ||
180 type->typ_md_ops == NULL ||
181 type->typ_name == NULL)
184 *(type->typ_dt_ops) = *dt_ops;
185 /* md_ops is optional */
187 *(type->typ_md_ops) = *md_ops;
188 strcpy(type->typ_name, name);
189 cfs_spin_lock_init(&type->obd_type_lock);
192 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
194 if (IS_ERR(type->typ_procroot)) {
195 rc = PTR_ERR(type->typ_procroot);
196 type->typ_procroot = NULL;
202 rc = lu_device_type_init(ldt);
207 cfs_spin_lock(&obd_types_lock);
208 cfs_list_add(&type->typ_chain, &obd_types);
209 cfs_spin_unlock(&obd_types_lock);
214 if (type->typ_name != NULL)
215 OBD_FREE(type->typ_name, strlen(name) + 1);
216 if (type->typ_md_ops != NULL)
217 OBD_FREE_PTR(type->typ_md_ops);
218 if (type->typ_dt_ops != NULL)
219 OBD_FREE_PTR(type->typ_dt_ops);
220 OBD_FREE(type, sizeof(*type));
223 EXPORT_SYMBOL(class_register_type);
225 int class_unregister_type(const char *name)
227 struct obd_type *type = class_search_type(name);
231 CERROR("unknown obd type\n");
235 if (type->typ_refcnt) {
236 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
237 /* This is a bad situation, let's make the best of it */
238 /* Remove ops, but leave the name for debugging */
239 OBD_FREE_PTR(type->typ_dt_ops);
240 OBD_FREE_PTR(type->typ_md_ops);
244 if (type->typ_procroot) {
245 lprocfs_remove(&type->typ_procroot);
249 lu_device_type_fini(type->typ_lu);
251 cfs_spin_lock(&obd_types_lock);
252 cfs_list_del(&type->typ_chain);
253 cfs_spin_unlock(&obd_types_lock);
254 OBD_FREE(type->typ_name, strlen(name) + 1);
255 if (type->typ_dt_ops != NULL)
256 OBD_FREE_PTR(type->typ_dt_ops);
257 if (type->typ_md_ops != NULL)
258 OBD_FREE_PTR(type->typ_md_ops);
259 OBD_FREE(type, sizeof(*type));
261 } /* class_unregister_type */
262 EXPORT_SYMBOL(class_unregister_type);
265 * Create a new obd device.
267 * Find an empty slot in ::obd_devs[], create a new obd device in it.
269 * \param[in] type_name obd device type string.
270 * \param[in] name obd device name.
272 * \retval NULL if create fails, otherwise return the obd device
275 struct obd_device *class_newdev(const char *type_name, const char *name)
277 struct obd_device *result = NULL;
278 struct obd_device *newdev;
279 struct obd_type *type = NULL;
281 int new_obd_minor = 0;
284 if (strlen(name) >= MAX_OBD_NAME) {
285 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
286 RETURN(ERR_PTR(-EINVAL));
289 type = class_get_type(type_name);
291 CERROR("OBD: unknown type: %s\n", type_name);
292 RETURN(ERR_PTR(-ENODEV));
295 newdev = obd_device_alloc();
296 if (newdev == NULL) {
297 class_put_type(type);
298 RETURN(ERR_PTR(-ENOMEM));
300 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
302 cfs_write_lock(&obd_dev_lock);
303 for (i = 0; i < class_devno_max(); i++) {
304 struct obd_device *obd = class_num2obd(i);
306 if (obd && obd->obd_name &&
307 (strcmp(name, obd->obd_name) == 0)) {
308 CERROR("Device %s already exists at %d, won't add\n",
311 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
312 "%p obd_magic %08x != %08x\n", result,
313 result->obd_magic, OBD_DEVICE_MAGIC);
314 LASSERTF(result->obd_minor == new_obd_minor,
315 "%p obd_minor %d != %d\n", result,
316 result->obd_minor, new_obd_minor);
318 obd_devs[result->obd_minor] = NULL;
319 result->obd_name[0]='\0';
321 result = ERR_PTR(-EEXIST);
324 if (!result && !obd) {
326 result->obd_minor = i;
328 result->obd_type = type;
329 strncpy(result->obd_name, name,
330 sizeof(result->obd_name) - 1);
331 obd_devs[i] = result;
334 cfs_write_unlock(&obd_dev_lock);
336 if (result == NULL && i >= class_devno_max()) {
337 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339 RETURN(ERR_PTR(-EOVERFLOW));
342 if (IS_ERR(result)) {
343 obd_device_free(newdev);
344 class_put_type(type);
346 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
347 result->obd_name, result);
352 void class_release_dev(struct obd_device *obd)
354 struct obd_type *obd_type = obd->obd_type;
356 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
357 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
358 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
359 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
360 LASSERT(obd_type != NULL);
362 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
363 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
365 cfs_write_lock(&obd_dev_lock);
366 obd_devs[obd->obd_minor] = NULL;
367 cfs_write_unlock(&obd_dev_lock);
368 obd_device_free(obd);
370 class_put_type(obd_type);
373 int class_name2dev(const char *name)
380 cfs_read_lock(&obd_dev_lock);
381 for (i = 0; i < class_devno_max(); i++) {
382 struct obd_device *obd = class_num2obd(i);
384 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
385 /* Make sure we finished attaching before we give
386 out any references */
387 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
388 if (obd->obd_attached) {
389 cfs_read_unlock(&obd_dev_lock);
395 cfs_read_unlock(&obd_dev_lock);
399 EXPORT_SYMBOL(class_name2dev);
401 struct obd_device *class_name2obd(const char *name)
403 int dev = class_name2dev(name);
405 if (dev < 0 || dev > class_devno_max())
407 return class_num2obd(dev);
409 EXPORT_SYMBOL(class_name2obd);
411 int class_uuid2dev(struct obd_uuid *uuid)
415 cfs_read_lock(&obd_dev_lock);
416 for (i = 0; i < class_devno_max(); i++) {
417 struct obd_device *obd = class_num2obd(i);
419 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
420 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
421 cfs_read_unlock(&obd_dev_lock);
425 cfs_read_unlock(&obd_dev_lock);
429 EXPORT_SYMBOL(class_uuid2dev);
431 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
433 int dev = class_uuid2dev(uuid);
436 return class_num2obd(dev);
438 EXPORT_SYMBOL(class_uuid2obd);
441 * Get obd device from ::obd_devs[]
443 * \param num [in] array index
445 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
446 * otherwise return the obd device there.
448 struct obd_device *class_num2obd(int num)
450 struct obd_device *obd = NULL;
452 if (num < class_devno_max()) {
457 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
458 "%p obd_magic %08x != %08x\n",
459 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
460 LASSERTF(obd->obd_minor == num,
461 "%p obd_minor %0d != %0d\n",
462 obd, obd->obd_minor, num);
467 EXPORT_SYMBOL(class_num2obd);
469 void class_obd_list(void)
474 cfs_read_lock(&obd_dev_lock);
475 for (i = 0; i < class_devno_max(); i++) {
476 struct obd_device *obd = class_num2obd(i);
480 if (obd->obd_stopping)
482 else if (obd->obd_set_up)
484 else if (obd->obd_attached)
488 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
489 i, status, obd->obd_type->typ_name,
490 obd->obd_name, obd->obd_uuid.uuid,
491 cfs_atomic_read(&obd->obd_refcount));
493 cfs_read_unlock(&obd_dev_lock);
497 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
498 specified, then only the client with that uuid is returned,
499 otherwise any client connected to the tgt is returned. */
500 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
501 const char * typ_name,
502 struct obd_uuid *grp_uuid)
506 cfs_read_lock(&obd_dev_lock);
507 for (i = 0; i < class_devno_max(); i++) {
508 struct obd_device *obd = class_num2obd(i);
512 if ((strncmp(obd->obd_type->typ_name, typ_name,
513 strlen(typ_name)) == 0)) {
514 if (obd_uuid_equals(tgt_uuid,
515 &obd->u.cli.cl_target_uuid) &&
516 ((grp_uuid)? obd_uuid_equals(grp_uuid,
517 &obd->obd_uuid) : 1)) {
518 cfs_read_unlock(&obd_dev_lock);
523 cfs_read_unlock(&obd_dev_lock);
527 EXPORT_SYMBOL(class_find_client_obd);
529 /* Iterate the obd_device list looking devices have grp_uuid. Start
530 searching at *next, and if a device is found, the next index to look
531 at is saved in *next. If next is NULL, then the first matching device
532 will always be returned. */
533 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
539 else if (*next >= 0 && *next < class_devno_max())
544 cfs_read_lock(&obd_dev_lock);
545 for (; i < class_devno_max(); i++) {
546 struct obd_device *obd = class_num2obd(i);
550 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
553 cfs_read_unlock(&obd_dev_lock);
557 cfs_read_unlock(&obd_dev_lock);
561 EXPORT_SYMBOL(class_devices_in_group);
564 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
565 * adjust sptlrpc settings accordingly.
567 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
569 struct obd_device *obd;
573 LASSERT(namelen > 0);
575 cfs_read_lock(&obd_dev_lock);
576 for (i = 0; i < class_devno_max(); i++) {
577 obd = class_num2obd(i);
579 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
582 /* only notify mdc, osc, mdt, ost */
583 type = obd->obd_type->typ_name;
584 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
585 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
586 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
587 strcmp(type, LUSTRE_OST_NAME) != 0)
590 if (strncmp(obd->obd_name, fsname, namelen))
593 class_incref(obd, __FUNCTION__, obd);
594 cfs_read_unlock(&obd_dev_lock);
595 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
596 sizeof(KEY_SPTLRPC_CONF),
597 KEY_SPTLRPC_CONF, 0, NULL, NULL);
599 class_decref(obd, __FUNCTION__, obd);
600 cfs_read_lock(&obd_dev_lock);
602 cfs_read_unlock(&obd_dev_lock);
605 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
607 void obd_cleanup_caches(void)
612 if (obd_device_cachep) {
613 rc = cfs_mem_cache_destroy(obd_device_cachep);
614 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
615 obd_device_cachep = NULL;
618 rc = cfs_mem_cache_destroy(obdo_cachep);
619 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
623 rc = cfs_mem_cache_destroy(import_cachep);
624 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
625 import_cachep = NULL;
628 rc = cfs_mem_cache_destroy(capa_cachep);
629 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
635 int obd_init_caches(void)
639 LASSERT(obd_device_cachep == NULL);
640 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
641 sizeof(struct obd_device),
643 if (!obd_device_cachep)
646 LASSERT(obdo_cachep == NULL);
647 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
652 LASSERT(import_cachep == NULL);
653 import_cachep = cfs_mem_cache_create("ll_import_cache",
654 sizeof(struct obd_import),
659 LASSERT(capa_cachep == NULL);
660 capa_cachep = cfs_mem_cache_create("capa_cache",
661 sizeof(struct obd_capa), 0, 0);
667 obd_cleanup_caches();
672 /* map connection to client */
673 struct obd_export *class_conn2export(struct lustre_handle *conn)
675 struct obd_export *export;
679 CDEBUG(D_CACHE, "looking for null handle\n");
683 if (conn->cookie == -1) { /* this means assign a new connection */
684 CDEBUG(D_CACHE, "want a new connection\n");
688 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
689 export = class_handle2object(conn->cookie);
692 EXPORT_SYMBOL(class_conn2export);
694 struct obd_device *class_exp2obd(struct obd_export *exp)
700 EXPORT_SYMBOL(class_exp2obd);
702 struct obd_device *class_conn2obd(struct lustre_handle *conn)
704 struct obd_export *export;
705 export = class_conn2export(conn);
707 struct obd_device *obd = export->exp_obd;
708 class_export_put(export);
713 EXPORT_SYMBOL(class_conn2obd);
715 struct obd_import *class_exp2cliimp(struct obd_export *exp)
717 struct obd_device *obd = exp->exp_obd;
720 return obd->u.cli.cl_import;
722 EXPORT_SYMBOL(class_exp2cliimp);
724 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
726 struct obd_device *obd = class_conn2obd(conn);
729 return obd->u.cli.cl_import;
731 EXPORT_SYMBOL(class_conn2cliimp);
733 /* Export management functions */
735 /* if export is involved in recovery then clean up related things */
736 void class_export_recovery_cleanup(struct obd_export *exp)
738 struct obd_device *obd = exp->exp_obd;
740 cfs_spin_lock(&obd->obd_recovery_task_lock);
741 if (exp->exp_delayed)
742 obd->obd_delayed_clients--;
743 if (obd->obd_recovering && exp->exp_in_recovery) {
744 cfs_spin_lock(&exp->exp_lock);
745 exp->exp_in_recovery = 0;
746 cfs_spin_unlock(&exp->exp_lock);
747 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
748 cfs_atomic_dec(&obd->obd_connected_clients);
750 cfs_spin_unlock(&obd->obd_recovery_task_lock);
751 /** Cleanup req replay fields */
752 if (exp->exp_req_replay_needed) {
753 cfs_spin_lock(&exp->exp_lock);
754 exp->exp_req_replay_needed = 0;
755 cfs_spin_unlock(&exp->exp_lock);
756 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
757 cfs_atomic_dec(&obd->obd_req_replay_clients);
759 /** Cleanup lock replay data */
760 if (exp->exp_lock_replay_needed) {
761 cfs_spin_lock(&exp->exp_lock);
762 exp->exp_lock_replay_needed = 0;
763 cfs_spin_unlock(&exp->exp_lock);
764 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
765 cfs_atomic_dec(&obd->obd_lock_replay_clients);
769 static void class_export_destroy(struct obd_export *exp)
771 struct obd_device *obd = exp->exp_obd;
774 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
775 LASSERT(obd != NULL);
777 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
778 exp->exp_client_uuid.uuid, obd->obd_name);
781 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
782 if (exp->exp_connection)
783 ptlrpc_put_connection_superhack(exp->exp_connection);
785 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
786 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
787 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
788 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
789 obd_destroy_export(exp);
790 class_decref(obd, "export", exp);
792 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
796 static void export_handle_addref(void *export)
798 class_export_get(export);
801 static struct portals_handle_ops export_handle_ops = {
802 .hop_addref = export_handle_addref,
806 struct obd_export *class_export_get(struct obd_export *exp)
808 cfs_atomic_inc(&exp->exp_refcount);
809 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
810 cfs_atomic_read(&exp->exp_refcount));
813 EXPORT_SYMBOL(class_export_get);
815 void class_export_put(struct obd_export *exp)
817 LASSERT(exp != NULL);
818 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
819 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
820 cfs_atomic_read(&exp->exp_refcount) - 1);
822 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
823 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
824 CDEBUG(D_IOCTL, "final put %p/%s\n",
825 exp, exp->exp_client_uuid.uuid);
827 /* release nid stat refererence */
828 lprocfs_exp_cleanup(exp);
829 class_export_recovery_cleanup(exp);
831 obd_zombie_export_add(exp);
834 EXPORT_SYMBOL(class_export_put);
836 /* Creates a new export, adds it to the hash table, and returns a
837 * pointer to it. The refcount is 2: one for the hash reference, and
838 * one for the pointer returned by this function. */
839 struct obd_export *class_new_export(struct obd_device *obd,
840 struct obd_uuid *cluuid)
842 struct obd_export *export;
843 cfs_hash_t *hash = NULL;
847 OBD_ALLOC_PTR(export);
849 return ERR_PTR(-ENOMEM);
851 export->exp_conn_cnt = 0;
852 export->exp_lock_hash = NULL;
853 export->exp_flock_hash = NULL;
854 cfs_atomic_set(&export->exp_refcount, 2);
855 cfs_atomic_set(&export->exp_rpc_count, 0);
856 cfs_atomic_set(&export->exp_cb_count, 0);
857 cfs_atomic_set(&export->exp_locks_count, 0);
858 #if LUSTRE_TRACKS_LOCK_EXP_REFS
859 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
860 cfs_spin_lock_init(&export->exp_locks_list_guard);
862 cfs_atomic_set(&export->exp_replay_count, 0);
863 export->exp_obd = obd;
864 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
865 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
866 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
867 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
868 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
869 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
870 class_handle_hash(&export->exp_handle, &export_handle_ops);
871 export->exp_last_request_time = cfs_time_current_sec();
872 cfs_spin_lock_init(&export->exp_lock);
873 cfs_spin_lock_init(&export->exp_rpc_lock);
874 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
875 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
876 cfs_spin_lock_init(&export->exp_bl_list_lock);
877 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
879 export->exp_sp_peer = LUSTRE_SP_ANY;
880 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
881 export->exp_client_uuid = *cluuid;
882 obd_init_export(export);
884 cfs_spin_lock(&obd->obd_dev_lock);
885 /* shouldn't happen, but might race */
886 if (obd->obd_stopping)
887 GOTO(exit_unlock, rc = -ENODEV);
889 hash = cfs_hash_getref(obd->obd_uuid_hash);
891 GOTO(exit_unlock, rc = -ENODEV);
892 cfs_spin_unlock(&obd->obd_dev_lock);
894 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
895 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
897 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
898 obd->obd_name, cluuid->uuid, rc);
899 GOTO(exit_err, rc = -EALREADY);
903 cfs_spin_lock(&obd->obd_dev_lock);
904 if (obd->obd_stopping) {
905 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
906 GOTO(exit_unlock, rc = -ENODEV);
909 class_incref(obd, "export", export);
910 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
911 cfs_list_add_tail(&export->exp_obd_chain_timed,
912 &export->exp_obd->obd_exports_timed);
913 export->exp_obd->obd_num_exports++;
914 cfs_spin_unlock(&obd->obd_dev_lock);
915 cfs_hash_putref(hash);
919 cfs_spin_unlock(&obd->obd_dev_lock);
922 cfs_hash_putref(hash);
923 class_handle_unhash(&export->exp_handle);
924 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
925 obd_destroy_export(export);
926 OBD_FREE_PTR(export);
929 EXPORT_SYMBOL(class_new_export);
931 void class_unlink_export(struct obd_export *exp)
933 class_handle_unhash(&exp->exp_handle);
935 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
936 /* delete an uuid-export hashitem from hashtables */
937 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
938 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
939 &exp->exp_client_uuid,
940 &exp->exp_uuid_hash);
942 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
943 cfs_list_del_init(&exp->exp_obd_chain_timed);
944 exp->exp_obd->obd_num_exports--;
945 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
946 class_export_put(exp);
948 EXPORT_SYMBOL(class_unlink_export);
950 /* Import management functions */
951 void class_import_destroy(struct obd_import *imp)
955 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
956 imp->imp_obd->obd_name);
958 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
960 ptlrpc_put_connection_superhack(imp->imp_connection);
962 while (!cfs_list_empty(&imp->imp_conn_list)) {
963 struct obd_import_conn *imp_conn;
965 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
966 struct obd_import_conn, oic_item);
967 cfs_list_del_init(&imp_conn->oic_item);
968 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
969 OBD_FREE(imp_conn, sizeof(*imp_conn));
972 LASSERT(imp->imp_sec == NULL);
973 class_decref(imp->imp_obd, "import", imp);
974 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
978 static void import_handle_addref(void *import)
980 class_import_get(import);
983 static struct portals_handle_ops import_handle_ops = {
984 .hop_addref = import_handle_addref,
988 struct obd_import *class_import_get(struct obd_import *import)
990 cfs_atomic_inc(&import->imp_refcount);
991 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
992 cfs_atomic_read(&import->imp_refcount),
993 import->imp_obd->obd_name);
996 EXPORT_SYMBOL(class_import_get);
998 void class_import_put(struct obd_import *imp)
1002 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1003 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1005 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1006 cfs_atomic_read(&imp->imp_refcount) - 1,
1007 imp->imp_obd->obd_name);
1009 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1010 CDEBUG(D_INFO, "final put import %p\n", imp);
1011 obd_zombie_import_add(imp);
1016 EXPORT_SYMBOL(class_import_put);
1018 static void init_imp_at(struct imp_at *at) {
1020 at_init(&at->iat_net_latency, 0, 0);
1021 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1022 /* max service estimates are tracked on the server side, so
1023 don't use the AT history here, just use the last reported
1024 val. (But keep hist for proc histogram, worst_ever) */
1025 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1030 struct obd_import *class_new_import(struct obd_device *obd)
1032 struct obd_import *imp;
1034 OBD_ALLOC(imp, sizeof(*imp));
1038 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1039 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1040 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1041 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1042 cfs_spin_lock_init(&imp->imp_lock);
1043 imp->imp_last_success_conn = 0;
1044 imp->imp_state = LUSTRE_IMP_NEW;
1045 imp->imp_obd = class_incref(obd, "import", imp);
1046 cfs_mutex_init(&imp->imp_sec_mutex);
1047 cfs_waitq_init(&imp->imp_recovery_waitq);
1049 cfs_atomic_set(&imp->imp_refcount, 2);
1050 cfs_atomic_set(&imp->imp_unregistering, 0);
1051 cfs_atomic_set(&imp->imp_inflight, 0);
1052 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1053 cfs_atomic_set(&imp->imp_inval_count, 0);
1054 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1055 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1056 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1057 init_imp_at(&imp->imp_at);
1059 /* the default magic is V2, will be used in connect RPC, and
1060 * then adjusted according to the flags in request/reply. */
1061 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1065 EXPORT_SYMBOL(class_new_import);
1067 void class_destroy_import(struct obd_import *import)
1069 LASSERT(import != NULL);
1070 LASSERT(import != LP_POISON);
1072 class_handle_unhash(&import->imp_handle);
1074 cfs_spin_lock(&import->imp_lock);
1075 import->imp_generation++;
1076 cfs_spin_unlock(&import->imp_lock);
1077 class_import_put(import);
1079 EXPORT_SYMBOL(class_destroy_import);
1081 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1083 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1085 cfs_spin_lock(&exp->exp_locks_list_guard);
1087 LASSERT(lock->l_exp_refs_nr >= 0);
1089 if (lock->l_exp_refs_target != NULL &&
1090 lock->l_exp_refs_target != exp) {
1091 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1092 exp, lock, lock->l_exp_refs_target);
1094 if ((lock->l_exp_refs_nr ++) == 0) {
1095 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1096 lock->l_exp_refs_target = exp;
1098 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1099 lock, exp, lock->l_exp_refs_nr);
1100 cfs_spin_unlock(&exp->exp_locks_list_guard);
1102 EXPORT_SYMBOL(__class_export_add_lock_ref);
1104 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1106 cfs_spin_lock(&exp->exp_locks_list_guard);
1107 LASSERT(lock->l_exp_refs_nr > 0);
1108 if (lock->l_exp_refs_target != exp) {
1109 LCONSOLE_WARN("lock %p, "
1110 "mismatching export pointers: %p, %p\n",
1111 lock, lock->l_exp_refs_target, exp);
1113 if (-- lock->l_exp_refs_nr == 0) {
1114 cfs_list_del_init(&lock->l_exp_refs_link);
1115 lock->l_exp_refs_target = NULL;
1117 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1118 lock, exp, lock->l_exp_refs_nr);
1119 cfs_spin_unlock(&exp->exp_locks_list_guard);
1121 EXPORT_SYMBOL(__class_export_del_lock_ref);
1124 /* A connection defines an export context in which preallocation can
1125 be managed. This releases the export pointer reference, and returns
1126 the export handle, so the export refcount is 1 when this function
1128 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1129 struct obd_uuid *cluuid)
1131 struct obd_export *export;
1132 LASSERT(conn != NULL);
1133 LASSERT(obd != NULL);
1134 LASSERT(cluuid != NULL);
1137 export = class_new_export(obd, cluuid);
1139 RETURN(PTR_ERR(export));
1141 conn->cookie = export->exp_handle.h_cookie;
1142 class_export_put(export);
1144 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1145 cluuid->uuid, conn->cookie);
1148 EXPORT_SYMBOL(class_connect);
1151 /* This function removes 1-3 references from the export:
1152 * 1 - for export pointer passed
1153 * and if disconnect really need
1154 * 2 - removing from hash
1155 * 3 - in client_unlink_export
1156 * The export pointer passed to this function can destroyed */
1157 int class_disconnect(struct obd_export *export)
1159 int already_disconnected;
1162 if (export == NULL) {
1163 CWARN("attempting to free NULL export %p\n", export);
1167 cfs_spin_lock(&export->exp_lock);
1168 already_disconnected = export->exp_disconnected;
1169 export->exp_disconnected = 1;
1170 cfs_spin_unlock(&export->exp_lock);
1172 /* class_cleanup(), abort_recovery(), and class_fail_export()
1173 * all end up in here, and if any of them race we shouldn't
1174 * call extra class_export_puts(). */
1175 if (already_disconnected) {
1176 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1177 GOTO(no_disconn, already_disconnected);
1180 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1181 export->exp_handle.h_cookie);
1183 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1184 cfs_hash_del(export->exp_obd->obd_nid_hash,
1185 &export->exp_connection->c_peer.nid,
1186 &export->exp_nid_hash);
1188 class_unlink_export(export);
1190 class_export_put(export);
1193 EXPORT_SYMBOL(class_disconnect);
1195 /* Return non-zero for a fully connected export */
1196 int class_connected_export(struct obd_export *exp)
1200 cfs_spin_lock(&exp->exp_lock);
1201 connected = (exp->exp_conn_cnt > 0);
1202 cfs_spin_unlock(&exp->exp_lock);
1207 EXPORT_SYMBOL(class_connected_export);
1209 static void class_disconnect_export_list(cfs_list_t *list,
1210 enum obd_option flags)
1213 struct obd_export *exp;
1216 /* It's possible that an export may disconnect itself, but
1217 * nothing else will be added to this list. */
1218 while (!cfs_list_empty(list)) {
1219 exp = cfs_list_entry(list->next, struct obd_export,
1221 /* need for safe call CDEBUG after obd_disconnect */
1222 class_export_get(exp);
1224 cfs_spin_lock(&exp->exp_lock);
1225 exp->exp_flags = flags;
1226 cfs_spin_unlock(&exp->exp_lock);
1228 if (obd_uuid_equals(&exp->exp_client_uuid,
1229 &exp->exp_obd->obd_uuid)) {
1231 "exp %p export uuid == obd uuid, don't discon\n",
1233 /* Need to delete this now so we don't end up pointing
1234 * to work_list later when this export is cleaned up. */
1235 cfs_list_del_init(&exp->exp_obd_chain);
1236 class_export_put(exp);
1240 class_export_get(exp);
1241 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1242 "last request at "CFS_TIME_T"\n",
1243 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1244 exp, exp->exp_last_request_time);
1245 /* release one export reference anyway */
1246 rc = obd_disconnect(exp);
1248 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1249 obd_export_nid2str(exp), exp, rc);
1250 class_export_put(exp);
1255 void class_disconnect_exports(struct obd_device *obd)
1257 cfs_list_t work_list;
1260 /* Move all of the exports from obd_exports to a work list, en masse. */
1261 CFS_INIT_LIST_HEAD(&work_list);
1262 cfs_spin_lock(&obd->obd_dev_lock);
1263 cfs_list_splice_init(&obd->obd_exports, &work_list);
1264 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1265 cfs_spin_unlock(&obd->obd_dev_lock);
1267 if (!cfs_list_empty(&work_list)) {
1268 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1269 "disconnecting them\n", obd->obd_minor, obd);
1270 class_disconnect_export_list(&work_list,
1271 exp_flags_from_obd(obd));
1273 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1274 obd->obd_minor, obd);
1277 EXPORT_SYMBOL(class_disconnect_exports);
1279 /* Remove exports that have not completed recovery.
1281 void class_disconnect_stale_exports(struct obd_device *obd,
1282 int (*test_export)(struct obd_export *))
1284 cfs_list_t work_list;
1285 cfs_list_t *pos, *n;
1286 struct obd_export *exp;
1290 CFS_INIT_LIST_HEAD(&work_list);
1291 cfs_spin_lock(&obd->obd_dev_lock);
1292 cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1295 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1297 /* don't count self-export as client */
1298 if (obd_uuid_equals(&exp->exp_client_uuid,
1299 &exp->exp_obd->obd_uuid))
1302 if (test_export(exp))
1305 cfs_spin_lock(&exp->exp_lock);
1306 failed = exp->exp_failed;
1307 exp->exp_failed = 1;
1308 cfs_spin_unlock(&exp->exp_lock);
1312 cfs_list_move(&exp->exp_obd_chain, &work_list);
1314 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1315 obd->obd_name, exp->exp_client_uuid.uuid,
1316 exp->exp_connection == NULL ? "<unknown>" :
1317 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1318 print_export_data(exp, "EVICTING", 0);
1320 cfs_spin_unlock(&obd->obd_dev_lock);
1323 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1324 obd->obd_name, evicted);
1325 obd->obd_stale_clients += evicted;
1327 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1328 OBD_OPT_ABORT_RECOV);
1331 EXPORT_SYMBOL(class_disconnect_stale_exports);
1333 void class_fail_export(struct obd_export *exp)
1335 int rc, already_failed;
1337 cfs_spin_lock(&exp->exp_lock);
1338 already_failed = exp->exp_failed;
1339 exp->exp_failed = 1;
1340 cfs_spin_unlock(&exp->exp_lock);
1342 if (already_failed) {
1343 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1344 exp, exp->exp_client_uuid.uuid);
1348 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1349 exp, exp->exp_client_uuid.uuid);
1351 if (obd_dump_on_timeout)
1352 libcfs_debug_dumplog();
1354 /* need for safe call CDEBUG after obd_disconnect */
1355 class_export_get(exp);
1357 /* Most callers into obd_disconnect are removing their own reference
1358 * (request, for example) in addition to the one from the hash table.
1359 * We don't have such a reference here, so make one. */
1360 class_export_get(exp);
1361 rc = obd_disconnect(exp);
1363 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1365 CDEBUG(D_HA, "disconnected export %p/%s\n",
1366 exp, exp->exp_client_uuid.uuid);
1367 class_export_put(exp);
1369 EXPORT_SYMBOL(class_fail_export);
1371 char *obd_export_nid2str(struct obd_export *exp)
1373 if (exp->exp_connection != NULL)
1374 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1378 EXPORT_SYMBOL(obd_export_nid2str);
1380 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1382 struct obd_export *doomed_exp = NULL;
1383 int exports_evicted = 0;
1385 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1388 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1389 if (doomed_exp == NULL)
1392 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1393 "nid %s found, wanted nid %s, requested nid %s\n",
1394 obd_export_nid2str(doomed_exp),
1395 libcfs_nid2str(nid_key), nid);
1396 LASSERTF(doomed_exp != obd->obd_self_export,
1397 "self-export is hashed by NID?\n");
1399 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1400 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1402 class_fail_export(doomed_exp);
1403 class_export_put(doomed_exp);
1406 if (!exports_evicted)
1407 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1408 obd->obd_name, nid);
1409 return exports_evicted;
1411 EXPORT_SYMBOL(obd_export_evict_by_nid);
1413 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1415 struct obd_export *doomed_exp = NULL;
1416 struct obd_uuid doomed_uuid;
1417 int exports_evicted = 0;
1419 obd_str2uuid(&doomed_uuid, uuid);
1420 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1421 CERROR("%s: can't evict myself\n", obd->obd_name);
1422 return exports_evicted;
1425 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1427 if (doomed_exp == NULL) {
1428 CERROR("%s: can't disconnect %s: no exports found\n",
1429 obd->obd_name, uuid);
1431 CWARN("%s: evicting %s at adminstrative request\n",
1432 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1433 class_fail_export(doomed_exp);
1434 class_export_put(doomed_exp);
1438 return exports_evicted;
1440 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1442 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1443 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1444 EXPORT_SYMBOL(class_export_dump_hook);
1447 static void print_export_data(struct obd_export *exp, const char *status,
1450 struct ptlrpc_reply_state *rs;
1451 struct ptlrpc_reply_state *first_reply = NULL;
1454 cfs_spin_lock(&exp->exp_lock);
1455 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1461 cfs_spin_unlock(&exp->exp_lock);
1463 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1464 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1465 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1466 cfs_atomic_read(&exp->exp_rpc_count),
1467 cfs_atomic_read(&exp->exp_cb_count),
1468 cfs_atomic_read(&exp->exp_locks_count),
1469 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1470 nreplies, first_reply, nreplies > 3 ? "..." : "",
1471 exp->exp_last_committed);
1472 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1473 if (locks && class_export_dump_hook != NULL)
1474 class_export_dump_hook(exp);
1478 void dump_exports(struct obd_device *obd, int locks)
1480 struct obd_export *exp;
1482 cfs_spin_lock(&obd->obd_dev_lock);
1483 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1484 print_export_data(exp, "ACTIVE", locks);
1485 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1486 print_export_data(exp, "UNLINKED", locks);
1487 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1488 print_export_data(exp, "DELAYED", locks);
1489 cfs_spin_unlock(&obd->obd_dev_lock);
1490 cfs_spin_lock(&obd_zombie_impexp_lock);
1491 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1492 print_export_data(exp, "ZOMBIE", locks);
1493 cfs_spin_unlock(&obd_zombie_impexp_lock);
1495 EXPORT_SYMBOL(dump_exports);
1497 void obd_exports_barrier(struct obd_device *obd)
1500 LASSERT(cfs_list_empty(&obd->obd_exports));
1501 cfs_spin_lock(&obd->obd_dev_lock);
1502 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1503 cfs_spin_unlock(&obd->obd_dev_lock);
1504 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1505 cfs_time_seconds(waited));
1506 if (waited > 5 && IS_PO2(waited)) {
1507 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1508 "more than %d seconds. "
1509 "The obd refcount = %d. Is it stuck?\n",
1510 obd->obd_name, waited,
1511 cfs_atomic_read(&obd->obd_refcount));
1512 dump_exports(obd, 1);
1515 cfs_spin_lock(&obd->obd_dev_lock);
1517 cfs_spin_unlock(&obd->obd_dev_lock);
1519 EXPORT_SYMBOL(obd_exports_barrier);
1521 /* Total amount of zombies to be destroyed */
1522 static int zombies_count = 0;
1525 * kill zombie imports and exports
1527 void obd_zombie_impexp_cull(void)
1529 struct obd_import *import;
1530 struct obd_export *export;
1534 cfs_spin_lock(&obd_zombie_impexp_lock);
1537 if (!cfs_list_empty(&obd_zombie_imports)) {
1538 import = cfs_list_entry(obd_zombie_imports.next,
1541 cfs_list_del_init(&import->imp_zombie_chain);
1545 if (!cfs_list_empty(&obd_zombie_exports)) {
1546 export = cfs_list_entry(obd_zombie_exports.next,
1549 cfs_list_del_init(&export->exp_obd_chain);
1552 cfs_spin_unlock(&obd_zombie_impexp_lock);
1554 if (import != NULL) {
1555 class_import_destroy(import);
1556 cfs_spin_lock(&obd_zombie_impexp_lock);
1558 cfs_spin_unlock(&obd_zombie_impexp_lock);
1561 if (export != NULL) {
1562 class_export_destroy(export);
1563 cfs_spin_lock(&obd_zombie_impexp_lock);
1565 cfs_spin_unlock(&obd_zombie_impexp_lock);
1569 } while (import != NULL || export != NULL);
1573 static cfs_completion_t obd_zombie_start;
1574 static cfs_completion_t obd_zombie_stop;
1575 static unsigned long obd_zombie_flags;
1576 static cfs_waitq_t obd_zombie_waitq;
1577 static pid_t obd_zombie_pid;
1580 OBD_ZOMBIE_STOP = 1 << 1
1584 * check for work for kill zombie import/export thread.
1586 static int obd_zombie_impexp_check(void *arg)
1590 cfs_spin_lock(&obd_zombie_impexp_lock);
1591 rc = (zombies_count == 0) &&
1592 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1593 cfs_spin_unlock(&obd_zombie_impexp_lock);
1599 * Add export to the obd_zombe thread and notify it.
1601 static void obd_zombie_export_add(struct obd_export *exp) {
1602 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1603 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1604 cfs_list_del_init(&exp->exp_obd_chain);
1605 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1606 cfs_spin_lock(&obd_zombie_impexp_lock);
1608 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1609 cfs_spin_unlock(&obd_zombie_impexp_lock);
1611 obd_zombie_impexp_notify();
1615 * Add import to the obd_zombe thread and notify it.
1617 static void obd_zombie_import_add(struct obd_import *imp) {
1618 LASSERT(imp->imp_sec == NULL);
1619 LASSERT(imp->imp_rq_pool == NULL);
1620 cfs_spin_lock(&obd_zombie_impexp_lock);
1621 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1623 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1624 cfs_spin_unlock(&obd_zombie_impexp_lock);
1626 obd_zombie_impexp_notify();
1630 * notify import/export destroy thread about new zombie.
1632 static void obd_zombie_impexp_notify(void)
1635 * Make sure obd_zomebie_impexp_thread get this notification.
1636 * It is possible this signal only get by obd_zombie_barrier, and
1637 * barrier gulps this notification and sleeps away and hangs ensues
1639 cfs_waitq_broadcast(&obd_zombie_waitq);
1643 * check whether obd_zombie is idle
1645 static int obd_zombie_is_idle(void)
1649 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1650 cfs_spin_lock(&obd_zombie_impexp_lock);
1651 rc = (zombies_count == 0);
1652 cfs_spin_unlock(&obd_zombie_impexp_lock);
1657 * wait when obd_zombie import/export queues become empty
1659 void obd_zombie_barrier(void)
1661 struct l_wait_info lwi = { 0 };
1663 if (obd_zombie_pid == cfs_curproc_pid())
1664 /* don't wait for myself */
1666 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1668 EXPORT_SYMBOL(obd_zombie_barrier);
1673 * destroy zombie export/import thread.
1675 static int obd_zombie_impexp_thread(void *unused)
1679 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1680 cfs_complete(&obd_zombie_start);
1684 cfs_complete(&obd_zombie_start);
1686 obd_zombie_pid = cfs_curproc_pid();
1688 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1689 struct l_wait_info lwi = { 0 };
1691 l_wait_event(obd_zombie_waitq,
1692 !obd_zombie_impexp_check(NULL), &lwi);
1693 obd_zombie_impexp_cull();
1696 * Notify obd_zombie_barrier callers that queues
1699 cfs_waitq_signal(&obd_zombie_waitq);
1702 cfs_complete(&obd_zombie_stop);
1707 #else /* ! KERNEL */
1709 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1710 static void *obd_zombie_impexp_work_cb;
1711 static void *obd_zombie_impexp_idle_cb;
1713 int obd_zombie_impexp_kill(void *arg)
1717 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1718 obd_zombie_impexp_cull();
1721 cfs_atomic_dec(&zombie_recur);
1728 * start destroy zombie import/export thread
1730 int obd_zombie_impexp_init(void)
1734 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1735 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1736 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1737 cfs_init_completion(&obd_zombie_start);
1738 cfs_init_completion(&obd_zombie_stop);
1739 cfs_waitq_init(&obd_zombie_waitq);
1743 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1747 cfs_wait_for_completion(&obd_zombie_start);
1750 obd_zombie_impexp_work_cb =
1751 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1752 &obd_zombie_impexp_kill, NULL);
1754 obd_zombie_impexp_idle_cb =
1755 liblustre_register_idle_callback("obd_zombi_impexp_check",
1756 &obd_zombie_impexp_check, NULL);
1762 * stop destroy zombie import/export thread
1764 void obd_zombie_impexp_stop(void)
1766 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1767 obd_zombie_impexp_notify();
1769 cfs_wait_for_completion(&obd_zombie_stop);
1771 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1772 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1776 /***** Kernel-userspace comm helpers *******/
1778 /* Get length of entire message, including header */
1779 int kuc_len(int payload_len)
1781 return sizeof(struct kuc_hdr) + payload_len;
1783 EXPORT_SYMBOL(kuc_len);
1785 /* Get a pointer to kuc header, given a ptr to the payload
1786 * @param p Pointer to payload area
1787 * @returns Pointer to kuc header
1789 struct kuc_hdr * kuc_ptr(void *p)
1791 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1792 LASSERT(lh->kuc_magic == KUC_MAGIC);
1795 EXPORT_SYMBOL(kuc_ptr);
1797 /* Test if payload is part of kuc message
1798 * @param p Pointer to payload area
1801 int kuc_ispayload(void *p)
1803 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1805 if (kh->kuc_magic == KUC_MAGIC)
1810 EXPORT_SYMBOL(kuc_ispayload);
1812 /* Alloc space for a message, and fill in header
1813 * @return Pointer to payload area
1815 void *kuc_alloc(int payload_len, int transport, int type)
1818 int len = kuc_len(payload_len);
1822 return ERR_PTR(-ENOMEM);
1824 lh->kuc_magic = KUC_MAGIC;
1825 lh->kuc_transport = transport;
1826 lh->kuc_msgtype = type;
1827 lh->kuc_msglen = len;
1829 return (void *)(lh + 1);
1831 EXPORT_SYMBOL(kuc_alloc);
1833 /* Takes pointer to payload area */
1834 inline void kuc_free(void *p, int payload_len)
1836 struct kuc_hdr *lh = kuc_ptr(p);
1837 OBD_FREE(lh, kuc_len(payload_len));
1839 EXPORT_SYMBOL(kuc_free);