1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * lustre/obdclass/genops.c
41 * These are the only exported functions, they provide some generic
42 * infrastructure for managing object devices
45 #define DEBUG_SUBSYSTEM S_CLASS
47 #include <liblustre.h>
50 #include <obd_class.h>
51 #include <lprocfs_status.h>
53 extern cfs_list_t obd_types;
54 cfs_spinlock_t obd_types_lock;
56 cfs_mem_cache_t *obd_device_cachep;
57 cfs_mem_cache_t *obdo_cachep;
58 EXPORT_SYMBOL(obdo_cachep);
59 cfs_mem_cache_t *import_cachep;
61 cfs_list_t obd_zombie_imports;
62 cfs_list_t obd_zombie_exports;
63 cfs_spinlock_t obd_zombie_impexp_lock;
64 static void obd_zombie_impexp_notify(void);
65 static void obd_zombie_export_add(struct obd_export *exp);
66 static void obd_zombie_import_add(struct obd_import *imp);
67 static void print_export_data(struct obd_export *exp,
68 const char *status, int locks);
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
73 * support functions: we could use inter-module communication, but this
74 * is more portable to other OS's
76 static struct obd_device *obd_device_alloc(void)
78 struct obd_device *obd;
80 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
82 obd->obd_magic = OBD_DEVICE_MAGIC;
87 static void obd_device_free(struct obd_device *obd)
90 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
91 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
92 if (obd->obd_namespace != NULL) {
93 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
94 obd, obd->obd_namespace, obd->obd_force);
97 lu_ref_fini(&obd->obd_reference);
98 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
101 struct obd_type *class_search_type(const char *name)
104 struct obd_type *type;
106 cfs_spin_lock(&obd_types_lock);
107 cfs_list_for_each(tmp, &obd_types) {
108 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
109 if (strcmp(type->typ_name, name) == 0) {
110 cfs_spin_unlock(&obd_types_lock);
114 cfs_spin_unlock(&obd_types_lock);
118 struct obd_type *class_get_type(const char *name)
120 struct obd_type *type = class_search_type(name);
122 #ifdef HAVE_MODULE_LOADING_SUPPORT
124 const char *modname = name;
125 if (!cfs_request_module("%s", modname)) {
126 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127 type = class_search_type(name);
129 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135 cfs_spin_lock(&type->obd_type_lock);
137 cfs_try_module_get(type->typ_dt_ops->o_owner);
138 cfs_spin_unlock(&type->obd_type_lock);
142 EXPORT_SYMBOL(class_get_type);
144 void class_put_type(struct obd_type *type)
147 cfs_spin_lock(&type->obd_type_lock);
149 cfs_module_put(type->typ_dt_ops->o_owner);
150 cfs_spin_unlock(&type->obd_type_lock);
152 EXPORT_SYMBOL(class_put_type);
154 #define CLASS_MAX_NAME 1024
156 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
157 struct lprocfs_vars *vars, const char *name,
158 struct lu_device_type *ldt)
160 struct obd_type *type;
165 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
167 if (class_search_type(name)) {
168 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
173 OBD_ALLOC(type, sizeof(*type));
177 OBD_ALLOC_PTR(type->typ_dt_ops);
178 OBD_ALLOC_PTR(type->typ_md_ops);
179 OBD_ALLOC(type->typ_name, strlen(name) + 1);
181 if (type->typ_dt_ops == NULL ||
182 type->typ_md_ops == NULL ||
183 type->typ_name == NULL)
186 *(type->typ_dt_ops) = *dt_ops;
187 /* md_ops is optional */
189 *(type->typ_md_ops) = *md_ops;
190 strcpy(type->typ_name, name);
191 cfs_spin_lock_init(&type->obd_type_lock);
194 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
196 if (IS_ERR(type->typ_procroot)) {
197 rc = PTR_ERR(type->typ_procroot);
198 type->typ_procroot = NULL;
204 rc = lu_device_type_init(ldt);
209 cfs_spin_lock(&obd_types_lock);
210 cfs_list_add(&type->typ_chain, &obd_types);
211 cfs_spin_unlock(&obd_types_lock);
216 if (type->typ_name != NULL)
217 OBD_FREE(type->typ_name, strlen(name) + 1);
218 if (type->typ_md_ops != NULL)
219 OBD_FREE_PTR(type->typ_md_ops);
220 if (type->typ_dt_ops != NULL)
221 OBD_FREE_PTR(type->typ_dt_ops);
222 OBD_FREE(type, sizeof(*type));
225 EXPORT_SYMBOL(class_register_type);
227 int class_unregister_type(const char *name)
229 struct obd_type *type = class_search_type(name);
233 CERROR("unknown obd type\n");
237 if (type->typ_refcnt) {
238 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
239 /* This is a bad situation, let's make the best of it */
240 /* Remove ops, but leave the name for debugging */
241 OBD_FREE_PTR(type->typ_dt_ops);
242 OBD_FREE_PTR(type->typ_md_ops);
246 if (type->typ_procroot) {
247 lprocfs_remove(&type->typ_procroot);
251 lu_device_type_fini(type->typ_lu);
253 cfs_spin_lock(&obd_types_lock);
254 cfs_list_del(&type->typ_chain);
255 cfs_spin_unlock(&obd_types_lock);
256 OBD_FREE(type->typ_name, strlen(name) + 1);
257 if (type->typ_dt_ops != NULL)
258 OBD_FREE_PTR(type->typ_dt_ops);
259 if (type->typ_md_ops != NULL)
260 OBD_FREE_PTR(type->typ_md_ops);
261 OBD_FREE(type, sizeof(*type));
263 } /* class_unregister_type */
264 EXPORT_SYMBOL(class_unregister_type);
267 * Create a new obd device.
269 * Find an empty slot in ::obd_devs[], create a new obd device in it.
271 * \param[in] type_name obd device type string.
272 * \param[in] name obd device name.
274 * \retval NULL if create fails, otherwise return the obd device
277 struct obd_device *class_newdev(const char *type_name, const char *name)
279 struct obd_device *result = NULL;
280 struct obd_device *newdev;
281 struct obd_type *type = NULL;
283 int new_obd_minor = 0;
286 if (strlen(name) >= MAX_OBD_NAME) {
287 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
288 RETURN(ERR_PTR(-EINVAL));
291 type = class_get_type(type_name);
293 CERROR("OBD: unknown type: %s\n", type_name);
294 RETURN(ERR_PTR(-ENODEV));
297 newdev = obd_device_alloc();
298 if (newdev == NULL) {
299 class_put_type(type);
300 RETURN(ERR_PTR(-ENOMEM));
302 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
304 cfs_write_lock(&obd_dev_lock);
305 for (i = 0; i < class_devno_max(); i++) {
306 struct obd_device *obd = class_num2obd(i);
308 if (obd && obd->obd_name &&
309 (strcmp(name, obd->obd_name) == 0)) {
310 CERROR("Device %s already exists at %d, won't add\n",
313 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
314 "%p obd_magic %08x != %08x\n", result,
315 result->obd_magic, OBD_DEVICE_MAGIC);
316 LASSERTF(result->obd_minor == new_obd_minor,
317 "%p obd_minor %d != %d\n", result,
318 result->obd_minor, new_obd_minor);
320 obd_devs[result->obd_minor] = NULL;
321 result->obd_name[0]='\0';
323 result = ERR_PTR(-EEXIST);
326 if (!result && !obd) {
328 result->obd_minor = i;
330 result->obd_type = type;
331 strncpy(result->obd_name, name,
332 sizeof(result->obd_name) - 1);
333 obd_devs[i] = result;
336 cfs_write_unlock(&obd_dev_lock);
338 if (result == NULL && i >= class_devno_max()) {
339 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
341 RETURN(ERR_PTR(-EOVERFLOW));
344 if (IS_ERR(result)) {
345 obd_device_free(newdev);
346 class_put_type(type);
348 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
349 result->obd_name, result);
354 void class_release_dev(struct obd_device *obd)
356 struct obd_type *obd_type = obd->obd_type;
358 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
359 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
360 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
361 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
362 LASSERT(obd_type != NULL);
364 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
365 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
367 cfs_write_lock(&obd_dev_lock);
368 obd_devs[obd->obd_minor] = NULL;
369 cfs_write_unlock(&obd_dev_lock);
370 obd_device_free(obd);
372 class_put_type(obd_type);
375 int class_name2dev(const char *name)
382 cfs_read_lock(&obd_dev_lock);
383 for (i = 0; i < class_devno_max(); i++) {
384 struct obd_device *obd = class_num2obd(i);
386 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
387 /* Make sure we finished attaching before we give
388 out any references */
389 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
390 if (obd->obd_attached) {
391 cfs_read_unlock(&obd_dev_lock);
397 cfs_read_unlock(&obd_dev_lock);
401 EXPORT_SYMBOL(class_name2dev);
403 struct obd_device *class_name2obd(const char *name)
405 int dev = class_name2dev(name);
407 if (dev < 0 || dev > class_devno_max())
409 return class_num2obd(dev);
411 EXPORT_SYMBOL(class_name2obd);
413 int class_uuid2dev(struct obd_uuid *uuid)
417 cfs_read_lock(&obd_dev_lock);
418 for (i = 0; i < class_devno_max(); i++) {
419 struct obd_device *obd = class_num2obd(i);
421 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
422 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
423 cfs_read_unlock(&obd_dev_lock);
427 cfs_read_unlock(&obd_dev_lock);
431 EXPORT_SYMBOL(class_uuid2dev);
433 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
435 int dev = class_uuid2dev(uuid);
438 return class_num2obd(dev);
440 EXPORT_SYMBOL(class_uuid2obd);
443 * Get obd device from ::obd_devs[]
445 * \param num [in] array index
447 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
448 * otherwise return the obd device there.
450 struct obd_device *class_num2obd(int num)
452 struct obd_device *obd = NULL;
454 if (num < class_devno_max()) {
459 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
460 "%p obd_magic %08x != %08x\n",
461 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
462 LASSERTF(obd->obd_minor == num,
463 "%p obd_minor %0d != %0d\n",
464 obd, obd->obd_minor, num);
469 EXPORT_SYMBOL(class_num2obd);
471 void class_obd_list(void)
476 cfs_read_lock(&obd_dev_lock);
477 for (i = 0; i < class_devno_max(); i++) {
478 struct obd_device *obd = class_num2obd(i);
482 if (obd->obd_stopping)
484 else if (obd->obd_set_up)
486 else if (obd->obd_attached)
490 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
491 i, status, obd->obd_type->typ_name,
492 obd->obd_name, obd->obd_uuid.uuid,
493 cfs_atomic_read(&obd->obd_refcount));
495 cfs_read_unlock(&obd_dev_lock);
499 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
500 specified, then only the client with that uuid is returned,
501 otherwise any client connected to the tgt is returned. */
502 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
503 const char * typ_name,
504 struct obd_uuid *grp_uuid)
508 cfs_read_lock(&obd_dev_lock);
509 for (i = 0; i < class_devno_max(); i++) {
510 struct obd_device *obd = class_num2obd(i);
514 if ((strncmp(obd->obd_type->typ_name, typ_name,
515 strlen(typ_name)) == 0)) {
516 if (obd_uuid_equals(tgt_uuid,
517 &obd->u.cli.cl_target_uuid) &&
518 ((grp_uuid)? obd_uuid_equals(grp_uuid,
519 &obd->obd_uuid) : 1)) {
520 cfs_read_unlock(&obd_dev_lock);
525 cfs_read_unlock(&obd_dev_lock);
529 EXPORT_SYMBOL(class_find_client_obd);
531 /* Iterate the obd_device list looking devices have grp_uuid. Start
532 searching at *next, and if a device is found, the next index to look
533 at is saved in *next. If next is NULL, then the first matching device
534 will always be returned. */
535 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
541 else if (*next >= 0 && *next < class_devno_max())
546 cfs_read_lock(&obd_dev_lock);
547 for (; i < class_devno_max(); i++) {
548 struct obd_device *obd = class_num2obd(i);
552 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
555 cfs_read_unlock(&obd_dev_lock);
559 cfs_read_unlock(&obd_dev_lock);
563 EXPORT_SYMBOL(class_devices_in_group);
566 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
567 * adjust sptlrpc settings accordingly.
569 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
571 struct obd_device *obd;
575 LASSERT(namelen > 0);
577 cfs_read_lock(&obd_dev_lock);
578 for (i = 0; i < class_devno_max(); i++) {
579 obd = class_num2obd(i);
581 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
584 /* only notify mdc, osc, mdt, ost */
585 type = obd->obd_type->typ_name;
586 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
587 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
588 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
589 strcmp(type, LUSTRE_OST_NAME) != 0)
592 if (strncmp(obd->obd_name, fsname, namelen))
595 class_incref(obd, __FUNCTION__, obd);
596 cfs_read_unlock(&obd_dev_lock);
597 rc2 = obd_set_info_async(obd->obd_self_export,
598 sizeof(KEY_SPTLRPC_CONF),
599 KEY_SPTLRPC_CONF, 0, NULL, NULL);
601 class_decref(obd, __FUNCTION__, obd);
602 cfs_read_lock(&obd_dev_lock);
604 cfs_read_unlock(&obd_dev_lock);
607 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
609 void obd_cleanup_caches(void)
614 if (obd_device_cachep) {
615 rc = cfs_mem_cache_destroy(obd_device_cachep);
616 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
617 obd_device_cachep = NULL;
620 rc = cfs_mem_cache_destroy(obdo_cachep);
621 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
625 rc = cfs_mem_cache_destroy(import_cachep);
626 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
627 import_cachep = NULL;
630 rc = cfs_mem_cache_destroy(capa_cachep);
631 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
637 int obd_init_caches(void)
641 LASSERT(obd_device_cachep == NULL);
642 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
643 sizeof(struct obd_device),
645 if (!obd_device_cachep)
648 LASSERT(obdo_cachep == NULL);
649 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
654 LASSERT(import_cachep == NULL);
655 import_cachep = cfs_mem_cache_create("ll_import_cache",
656 sizeof(struct obd_import),
661 LASSERT(capa_cachep == NULL);
662 capa_cachep = cfs_mem_cache_create("capa_cache",
663 sizeof(struct obd_capa), 0, 0);
669 obd_cleanup_caches();
674 /* map connection to client */
675 struct obd_export *class_conn2export(struct lustre_handle *conn)
677 struct obd_export *export;
681 CDEBUG(D_CACHE, "looking for null handle\n");
685 if (conn->cookie == -1) { /* this means assign a new connection */
686 CDEBUG(D_CACHE, "want a new connection\n");
690 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
691 export = class_handle2object(conn->cookie);
694 EXPORT_SYMBOL(class_conn2export);
696 struct obd_device *class_exp2obd(struct obd_export *exp)
702 EXPORT_SYMBOL(class_exp2obd);
704 struct obd_device *class_conn2obd(struct lustre_handle *conn)
706 struct obd_export *export;
707 export = class_conn2export(conn);
709 struct obd_device *obd = export->exp_obd;
710 class_export_put(export);
715 EXPORT_SYMBOL(class_conn2obd);
717 struct obd_import *class_exp2cliimp(struct obd_export *exp)
719 struct obd_device *obd = exp->exp_obd;
722 return obd->u.cli.cl_import;
724 EXPORT_SYMBOL(class_exp2cliimp);
726 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
728 struct obd_device *obd = class_conn2obd(conn);
731 return obd->u.cli.cl_import;
733 EXPORT_SYMBOL(class_conn2cliimp);
735 /* Export management functions */
736 static void class_export_destroy(struct obd_export *exp)
738 struct obd_device *obd = exp->exp_obd;
741 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
743 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
744 exp->exp_client_uuid.uuid, obd->obd_name);
746 LASSERT(obd != NULL);
748 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
749 if (exp->exp_connection)
750 ptlrpc_put_connection_superhack(exp->exp_connection);
752 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
753 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
754 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
755 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
756 obd_destroy_export(exp);
757 class_decref(obd, "export", exp);
759 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
763 static void export_handle_addref(void *export)
765 class_export_get(export);
768 struct obd_export *class_export_get(struct obd_export *exp)
770 cfs_atomic_inc(&exp->exp_refcount);
771 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
772 cfs_atomic_read(&exp->exp_refcount));
775 EXPORT_SYMBOL(class_export_get);
777 void class_export_put(struct obd_export *exp)
779 LASSERT(exp != NULL);
780 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
781 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
782 cfs_atomic_read(&exp->exp_refcount) - 1);
784 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
785 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
786 CDEBUG(D_IOCTL, "final put %p/%s\n",
787 exp, exp->exp_client_uuid.uuid);
789 /* release nid stat refererence */
790 lprocfs_exp_cleanup(exp);
792 obd_zombie_export_add(exp);
795 EXPORT_SYMBOL(class_export_put);
797 /* Creates a new export, adds it to the hash table, and returns a
798 * pointer to it. The refcount is 2: one for the hash reference, and
799 * one for the pointer returned by this function. */
800 struct obd_export *class_new_export(struct obd_device *obd,
801 struct obd_uuid *cluuid)
803 struct obd_export *export;
804 cfs_hash_t *hash = NULL;
808 OBD_ALLOC_PTR(export);
810 return ERR_PTR(-ENOMEM);
812 export->exp_conn_cnt = 0;
813 export->exp_lock_hash = NULL;
814 cfs_atomic_set(&export->exp_refcount, 2);
815 cfs_atomic_set(&export->exp_rpc_count, 0);
816 cfs_atomic_set(&export->exp_cb_count, 0);
817 cfs_atomic_set(&export->exp_locks_count, 0);
818 #if LUSTRE_TRACKS_LOCK_EXP_REFS
819 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
820 cfs_spin_lock_init(&export->exp_locks_list_guard);
822 cfs_atomic_set(&export->exp_replay_count, 0);
823 export->exp_obd = obd;
824 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
825 cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
826 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
827 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
828 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
829 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
830 class_handle_hash(&export->exp_handle, export_handle_addref);
831 export->exp_last_request_time = cfs_time_current_sec();
832 cfs_spin_lock_init(&export->exp_lock);
833 cfs_spin_lock_init(&export->exp_rpc_lock);
834 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
835 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
836 cfs_spin_lock_init(&export->exp_bl_list_lock);
837 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
839 export->exp_sp_peer = LUSTRE_SP_ANY;
840 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
841 export->exp_client_uuid = *cluuid;
842 obd_init_export(export);
844 cfs_spin_lock(&obd->obd_dev_lock);
845 /* shouldn't happen, but might race */
846 if (obd->obd_stopping)
847 GOTO(exit_unlock, rc = -ENODEV);
849 hash = cfs_hash_getref(obd->obd_uuid_hash);
851 GOTO(exit_unlock, rc = -ENODEV);
852 cfs_spin_unlock(&obd->obd_dev_lock);
854 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
855 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
857 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
858 obd->obd_name, cluuid->uuid, rc);
859 GOTO(exit_err, rc = -EALREADY);
863 cfs_spin_lock(&obd->obd_dev_lock);
864 if (obd->obd_stopping) {
865 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
866 GOTO(exit_unlock, rc = -ENODEV);
869 class_incref(obd, "export", export);
870 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
871 cfs_list_add_tail(&export->exp_obd_chain_timed,
872 &export->exp_obd->obd_exports_timed);
873 export->exp_obd->obd_num_exports++;
874 cfs_spin_unlock(&obd->obd_dev_lock);
875 cfs_hash_putref(hash);
879 cfs_spin_unlock(&obd->obd_dev_lock);
882 cfs_hash_putref(hash);
883 class_handle_unhash(&export->exp_handle);
884 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
885 obd_destroy_export(export);
886 OBD_FREE_PTR(export);
889 EXPORT_SYMBOL(class_new_export);
891 void class_unlink_export(struct obd_export *exp)
893 class_handle_unhash(&exp->exp_handle);
895 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
896 /* delete an uuid-export hashitem from hashtables */
897 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
898 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
899 &exp->exp_client_uuid,
900 &exp->exp_uuid_hash);
902 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
903 cfs_list_del_init(&exp->exp_obd_chain_timed);
904 exp->exp_obd->obd_num_exports--;
905 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
906 class_export_put(exp);
908 EXPORT_SYMBOL(class_unlink_export);
910 /* Import management functions */
911 void class_import_destroy(struct obd_import *imp)
915 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
916 imp->imp_obd->obd_name);
918 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
920 ptlrpc_put_connection_superhack(imp->imp_connection);
922 while (!cfs_list_empty(&imp->imp_conn_list)) {
923 struct obd_import_conn *imp_conn;
925 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
926 struct obd_import_conn, oic_item);
927 cfs_list_del_init(&imp_conn->oic_item);
928 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
929 OBD_FREE(imp_conn, sizeof(*imp_conn));
932 LASSERT(imp->imp_sec == NULL);
933 class_decref(imp->imp_obd, "import", imp);
934 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
938 static void import_handle_addref(void *import)
940 class_import_get(import);
943 struct obd_import *class_import_get(struct obd_import *import)
945 cfs_atomic_inc(&import->imp_refcount);
946 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
947 cfs_atomic_read(&import->imp_refcount),
948 import->imp_obd->obd_name);
951 EXPORT_SYMBOL(class_import_get);
953 void class_import_put(struct obd_import *imp)
957 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
958 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
960 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
961 cfs_atomic_read(&imp->imp_refcount) - 1,
962 imp->imp_obd->obd_name);
964 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
965 CDEBUG(D_INFO, "final put import %p\n", imp);
966 obd_zombie_import_add(imp);
971 EXPORT_SYMBOL(class_import_put);
973 static void init_imp_at(struct imp_at *at) {
975 at_init(&at->iat_net_latency, 0, 0);
976 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
977 /* max service estimates are tracked on the server side, so
978 don't use the AT history here, just use the last reported
979 val. (But keep hist for proc histogram, worst_ever) */
980 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
985 struct obd_import *class_new_import(struct obd_device *obd)
987 struct obd_import *imp;
989 OBD_ALLOC(imp, sizeof(*imp));
993 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
994 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
995 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
996 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
997 cfs_spin_lock_init(&imp->imp_lock);
998 imp->imp_last_success_conn = 0;
999 imp->imp_state = LUSTRE_IMP_NEW;
1000 imp->imp_obd = class_incref(obd, "import", imp);
1001 cfs_sema_init(&imp->imp_sec_mutex, 1);
1002 cfs_waitq_init(&imp->imp_recovery_waitq);
1004 cfs_atomic_set(&imp->imp_refcount, 2);
1005 cfs_atomic_set(&imp->imp_unregistering, 0);
1006 cfs_atomic_set(&imp->imp_inflight, 0);
1007 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1008 cfs_atomic_set(&imp->imp_inval_count, 0);
1009 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1010 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1011 class_handle_hash(&imp->imp_handle, import_handle_addref);
1012 init_imp_at(&imp->imp_at);
1014 /* the default magic is V2, will be used in connect RPC, and
1015 * then adjusted according to the flags in request/reply. */
1016 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1020 EXPORT_SYMBOL(class_new_import);
1022 void class_destroy_import(struct obd_import *import)
1024 LASSERT(import != NULL);
1025 LASSERT(import != LP_POISON);
1027 class_handle_unhash(&import->imp_handle);
1029 cfs_spin_lock(&import->imp_lock);
1030 import->imp_generation++;
1031 cfs_spin_unlock(&import->imp_lock);
1032 class_import_put(import);
1034 EXPORT_SYMBOL(class_destroy_import);
1036 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1038 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1040 cfs_spin_lock(&exp->exp_locks_list_guard);
1042 LASSERT(lock->l_exp_refs_nr >= 0);
1044 if (lock->l_exp_refs_target != NULL &&
1045 lock->l_exp_refs_target != exp) {
1046 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1047 exp, lock, lock->l_exp_refs_target);
1049 if ((lock->l_exp_refs_nr ++) == 0) {
1050 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1051 lock->l_exp_refs_target = exp;
1053 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1054 lock, exp, lock->l_exp_refs_nr);
1055 cfs_spin_unlock(&exp->exp_locks_list_guard);
1057 EXPORT_SYMBOL(__class_export_add_lock_ref);
1059 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1061 cfs_spin_lock(&exp->exp_locks_list_guard);
1062 LASSERT(lock->l_exp_refs_nr > 0);
1063 if (lock->l_exp_refs_target != exp) {
1064 LCONSOLE_WARN("lock %p, "
1065 "mismatching export pointers: %p, %p\n",
1066 lock, lock->l_exp_refs_target, exp);
1068 if (-- lock->l_exp_refs_nr == 0) {
1069 cfs_list_del_init(&lock->l_exp_refs_link);
1070 lock->l_exp_refs_target = NULL;
1072 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1073 lock, exp, lock->l_exp_refs_nr);
1074 cfs_spin_unlock(&exp->exp_locks_list_guard);
1076 EXPORT_SYMBOL(__class_export_del_lock_ref);
1079 /* A connection defines an export context in which preallocation can
1080 be managed. This releases the export pointer reference, and returns
1081 the export handle, so the export refcount is 1 when this function
1083 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1084 struct obd_uuid *cluuid)
1086 struct obd_export *export;
1087 LASSERT(conn != NULL);
1088 LASSERT(obd != NULL);
1089 LASSERT(cluuid != NULL);
1092 export = class_new_export(obd, cluuid);
1094 RETURN(PTR_ERR(export));
1096 conn->cookie = export->exp_handle.h_cookie;
1097 class_export_put(export);
1099 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1100 cluuid->uuid, conn->cookie);
1103 EXPORT_SYMBOL(class_connect);
1105 /* if export is involved in recovery then clean up related things */
1106 void class_export_recovery_cleanup(struct obd_export *exp)
1108 struct obd_device *obd = exp->exp_obd;
1110 cfs_spin_lock(&obd->obd_recovery_task_lock);
1111 if (exp->exp_delayed)
1112 obd->obd_delayed_clients--;
1113 if (obd->obd_recovering && exp->exp_in_recovery) {
1114 cfs_spin_lock(&exp->exp_lock);
1115 exp->exp_in_recovery = 0;
1116 cfs_spin_unlock(&exp->exp_lock);
1117 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1118 cfs_atomic_dec(&obd->obd_connected_clients);
1120 cfs_spin_unlock(&obd->obd_recovery_task_lock);
1121 /** Cleanup req replay fields */
1122 if (exp->exp_req_replay_needed) {
1123 cfs_spin_lock(&exp->exp_lock);
1124 exp->exp_req_replay_needed = 0;
1125 cfs_spin_unlock(&exp->exp_lock);
1126 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1127 cfs_atomic_dec(&obd->obd_req_replay_clients);
1129 /** Cleanup lock replay data */
1130 if (exp->exp_lock_replay_needed) {
1131 cfs_spin_lock(&exp->exp_lock);
1132 exp->exp_lock_replay_needed = 0;
1133 cfs_spin_unlock(&exp->exp_lock);
1134 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1135 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1139 /* This function removes 1-3 references from the export:
1140 * 1 - for export pointer passed
1141 * and if disconnect really need
1142 * 2 - removing from hash
1143 * 3 - in client_unlink_export
1144 * The export pointer passed to this function can destroyed */
1145 int class_disconnect(struct obd_export *export)
1147 int already_disconnected;
1150 if (export == NULL) {
1151 CWARN("attempting to free NULL export %p\n", export);
1155 cfs_spin_lock(&export->exp_lock);
1156 already_disconnected = export->exp_disconnected;
1157 export->exp_disconnected = 1;
1158 cfs_spin_unlock(&export->exp_lock);
1160 /* class_cleanup(), abort_recovery(), and class_fail_export()
1161 * all end up in here, and if any of them race we shouldn't
1162 * call extra class_export_puts(). */
1163 if (already_disconnected) {
1164 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1165 GOTO(no_disconn, already_disconnected);
1168 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1169 export->exp_handle.h_cookie);
1171 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1172 cfs_hash_del(export->exp_obd->obd_nid_hash,
1173 &export->exp_connection->c_peer.nid,
1174 &export->exp_nid_hash);
1176 class_export_recovery_cleanup(export);
1177 class_unlink_export(export);
1179 class_export_put(export);
1182 EXPORT_SYMBOL(class_disconnect);
1184 /* Return non-zero for a fully connected export */
1185 int class_connected_export(struct obd_export *exp)
1189 cfs_spin_lock(&exp->exp_lock);
1190 connected = (exp->exp_conn_cnt > 0);
1191 cfs_spin_unlock(&exp->exp_lock);
1196 EXPORT_SYMBOL(class_connected_export);
1198 static void class_disconnect_export_list(cfs_list_t *list,
1199 enum obd_option flags)
1202 struct obd_export *exp;
1205 /* It's possible that an export may disconnect itself, but
1206 * nothing else will be added to this list. */
1207 while (!cfs_list_empty(list)) {
1208 exp = cfs_list_entry(list->next, struct obd_export,
1210 /* need for safe call CDEBUG after obd_disconnect */
1211 class_export_get(exp);
1213 cfs_spin_lock(&exp->exp_lock);
1214 exp->exp_flags = flags;
1215 cfs_spin_unlock(&exp->exp_lock);
1217 if (obd_uuid_equals(&exp->exp_client_uuid,
1218 &exp->exp_obd->obd_uuid)) {
1220 "exp %p export uuid == obd uuid, don't discon\n",
1222 /* Need to delete this now so we don't end up pointing
1223 * to work_list later when this export is cleaned up. */
1224 cfs_list_del_init(&exp->exp_obd_chain);
1225 class_export_put(exp);
1229 class_export_get(exp);
1230 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1231 "last request at "CFS_TIME_T"\n",
1232 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1233 exp, exp->exp_last_request_time);
1234 /* release one export reference anyway */
1235 rc = obd_disconnect(exp);
1237 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1238 obd_export_nid2str(exp), exp, rc);
1239 class_export_put(exp);
1244 void class_disconnect_exports(struct obd_device *obd)
1246 cfs_list_t work_list;
1249 /* Move all of the exports from obd_exports to a work list, en masse. */
1250 CFS_INIT_LIST_HEAD(&work_list);
1251 cfs_spin_lock(&obd->obd_dev_lock);
1252 cfs_list_splice_init(&obd->obd_exports, &work_list);
1253 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1254 cfs_spin_unlock(&obd->obd_dev_lock);
1256 if (!cfs_list_empty(&work_list)) {
1257 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1258 "disconnecting them\n", obd->obd_minor, obd);
1259 class_disconnect_export_list(&work_list,
1260 exp_flags_from_obd(obd));
1262 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1263 obd->obd_minor, obd);
1266 EXPORT_SYMBOL(class_disconnect_exports);
1268 /* Remove exports that have not completed recovery.
1270 void class_disconnect_stale_exports(struct obd_device *obd,
1271 int (*test_export)(struct obd_export *))
1273 cfs_list_t work_list;
1274 cfs_list_t *pos, *n;
1275 struct obd_export *exp;
1279 CFS_INIT_LIST_HEAD(&work_list);
1280 cfs_spin_lock(&obd->obd_dev_lock);
1281 cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1282 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1283 if (test_export(exp))
1286 /* don't count self-export as client */
1287 if (obd_uuid_equals(&exp->exp_client_uuid,
1288 &exp->exp_obd->obd_uuid))
1291 cfs_list_move(&exp->exp_obd_chain, &work_list);
1293 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1294 obd->obd_name, exp->exp_client_uuid.uuid,
1295 exp->exp_connection == NULL ? "<unknown>" :
1296 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1297 print_export_data(exp, "EVICTING", 0);
1299 cfs_spin_unlock(&obd->obd_dev_lock);
1302 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1303 obd->obd_name, evicted);
1304 obd->obd_stale_clients += evicted;
1306 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1307 OBD_OPT_ABORT_RECOV);
1310 EXPORT_SYMBOL(class_disconnect_stale_exports);
1312 void class_fail_export(struct obd_export *exp)
1314 int rc, already_failed;
1316 cfs_spin_lock(&exp->exp_lock);
1317 already_failed = exp->exp_failed;
1318 exp->exp_failed = 1;
1319 cfs_spin_unlock(&exp->exp_lock);
1321 if (already_failed) {
1322 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1323 exp, exp->exp_client_uuid.uuid);
1327 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1328 exp, exp->exp_client_uuid.uuid);
1330 if (obd_dump_on_timeout)
1331 libcfs_debug_dumplog();
1333 /* Most callers into obd_disconnect are removing their own reference
1334 * (request, for example) in addition to the one from the hash table.
1335 * We don't have such a reference here, so make one. */
1336 class_export_get(exp);
1337 rc = obd_disconnect(exp);
1339 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1341 CDEBUG(D_HA, "disconnected export %p/%s\n",
1342 exp, exp->exp_client_uuid.uuid);
1344 EXPORT_SYMBOL(class_fail_export);
1346 char *obd_export_nid2str(struct obd_export *exp)
1348 if (exp->exp_connection != NULL)
1349 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1353 EXPORT_SYMBOL(obd_export_nid2str);
1355 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1357 struct obd_export *doomed_exp = NULL;
1358 int exports_evicted = 0;
1360 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1363 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1364 if (doomed_exp == NULL)
1367 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1368 "nid %s found, wanted nid %s, requested nid %s\n",
1369 obd_export_nid2str(doomed_exp),
1370 libcfs_nid2str(nid_key), nid);
1371 LASSERTF(doomed_exp != obd->obd_self_export,
1372 "self-export is hashed by NID?\n");
1374 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1375 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1377 class_fail_export(doomed_exp);
1378 class_export_put(doomed_exp);
1381 if (!exports_evicted)
1382 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1383 obd->obd_name, nid);
1384 return exports_evicted;
1386 EXPORT_SYMBOL(obd_export_evict_by_nid);
1388 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1390 struct obd_export *doomed_exp = NULL;
1391 struct obd_uuid doomed_uuid;
1392 int exports_evicted = 0;
1394 obd_str2uuid(&doomed_uuid, uuid);
1395 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1396 CERROR("%s: can't evict myself\n", obd->obd_name);
1397 return exports_evicted;
1400 doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1402 if (doomed_exp == NULL) {
1403 CERROR("%s: can't disconnect %s: no exports found\n",
1404 obd->obd_name, uuid);
1406 CWARN("%s: evicting %s at adminstrative request\n",
1407 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1408 class_fail_export(doomed_exp);
1409 class_export_put(doomed_exp);
1413 return exports_evicted;
1415 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1417 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1418 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1419 EXPORT_SYMBOL(class_export_dump_hook);
1422 static void print_export_data(struct obd_export *exp, const char *status,
1425 struct ptlrpc_reply_state *rs;
1426 struct ptlrpc_reply_state *first_reply = NULL;
1429 cfs_spin_lock(&exp->exp_lock);
1430 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1436 cfs_spin_unlock(&exp->exp_lock);
1438 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1439 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1440 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1441 cfs_atomic_read(&exp->exp_rpc_count),
1442 cfs_atomic_read(&exp->exp_cb_count),
1443 cfs_atomic_read(&exp->exp_locks_count),
1444 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1445 nreplies, first_reply, nreplies > 3 ? "..." : "",
1446 exp->exp_last_committed);
1447 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1448 if (locks && class_export_dump_hook != NULL)
1449 class_export_dump_hook(exp);
1453 void dump_exports(struct obd_device *obd, int locks)
1455 struct obd_export *exp;
1457 cfs_spin_lock(&obd->obd_dev_lock);
1458 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1459 print_export_data(exp, "ACTIVE", locks);
1460 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1461 print_export_data(exp, "UNLINKED", locks);
1462 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1463 print_export_data(exp, "DELAYED", locks);
1464 cfs_spin_unlock(&obd->obd_dev_lock);
1465 cfs_spin_lock(&obd_zombie_impexp_lock);
1466 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1467 print_export_data(exp, "ZOMBIE", locks);
1468 cfs_spin_unlock(&obd_zombie_impexp_lock);
1470 EXPORT_SYMBOL(dump_exports);
1472 void obd_exports_barrier(struct obd_device *obd)
1475 LASSERT(cfs_list_empty(&obd->obd_exports));
1476 cfs_spin_lock(&obd->obd_dev_lock);
1477 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1478 cfs_spin_unlock(&obd->obd_dev_lock);
1479 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1480 cfs_time_seconds(waited));
1481 if (waited > 5 && IS_PO2(waited)) {
1482 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1483 "more than %d seconds. "
1484 "The obd refcount = %d. Is it stuck?\n",
1485 obd->obd_name, waited,
1486 cfs_atomic_read(&obd->obd_refcount));
1487 dump_exports(obd, 1);
1490 cfs_spin_lock(&obd->obd_dev_lock);
1492 cfs_spin_unlock(&obd->obd_dev_lock);
1494 EXPORT_SYMBOL(obd_exports_barrier);
1496 /* Total amount of zombies to be destroyed */
1497 static int zombies_count = 0;
1500 * kill zombie imports and exports
1502 void obd_zombie_impexp_cull(void)
1504 struct obd_import *import;
1505 struct obd_export *export;
1509 cfs_spin_lock(&obd_zombie_impexp_lock);
1512 if (!cfs_list_empty(&obd_zombie_imports)) {
1513 import = cfs_list_entry(obd_zombie_imports.next,
1516 cfs_list_del_init(&import->imp_zombie_chain);
1520 if (!cfs_list_empty(&obd_zombie_exports)) {
1521 export = cfs_list_entry(obd_zombie_exports.next,
1524 cfs_list_del_init(&export->exp_obd_chain);
1527 cfs_spin_unlock(&obd_zombie_impexp_lock);
1529 if (import != NULL) {
1530 class_import_destroy(import);
1531 cfs_spin_lock(&obd_zombie_impexp_lock);
1533 cfs_spin_unlock(&obd_zombie_impexp_lock);
1536 if (export != NULL) {
1537 class_export_destroy(export);
1538 cfs_spin_lock(&obd_zombie_impexp_lock);
1540 cfs_spin_unlock(&obd_zombie_impexp_lock);
1544 } while (import != NULL || export != NULL);
1548 static cfs_completion_t obd_zombie_start;
1549 static cfs_completion_t obd_zombie_stop;
1550 static unsigned long obd_zombie_flags;
1551 static cfs_waitq_t obd_zombie_waitq;
1552 static pid_t obd_zombie_pid;
1555 OBD_ZOMBIE_STOP = 1 << 1
1559 * check for work for kill zombie import/export thread.
1561 static int obd_zombie_impexp_check(void *arg)
1565 cfs_spin_lock(&obd_zombie_impexp_lock);
1566 rc = (zombies_count == 0) &&
1567 !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1568 cfs_spin_unlock(&obd_zombie_impexp_lock);
1574 * Add export to the obd_zombe thread and notify it.
1576 static void obd_zombie_export_add(struct obd_export *exp) {
1577 cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1578 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1579 cfs_list_del_init(&exp->exp_obd_chain);
1580 cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1581 cfs_spin_lock(&obd_zombie_impexp_lock);
1583 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1584 cfs_spin_unlock(&obd_zombie_impexp_lock);
1586 obd_zombie_impexp_notify();
1590 * Add import to the obd_zombe thread and notify it.
1592 static void obd_zombie_import_add(struct obd_import *imp) {
1593 LASSERT(imp->imp_sec == NULL);
1594 cfs_spin_lock(&obd_zombie_impexp_lock);
1595 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1597 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1598 cfs_spin_unlock(&obd_zombie_impexp_lock);
1600 obd_zombie_impexp_notify();
1604 * notify import/export destroy thread about new zombie.
1606 static void obd_zombie_impexp_notify(void)
1609 * Make sure obd_zomebie_impexp_thread get this notification.
1610 * It is possible this signal only get by obd_zombie_barrier, and
1611 * barrier gulps this notification and sleeps away and hangs ensues
1613 cfs_waitq_broadcast(&obd_zombie_waitq);
1617 * check whether obd_zombie is idle
1619 static int obd_zombie_is_idle(void)
1623 LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1624 cfs_spin_lock(&obd_zombie_impexp_lock);
1625 rc = (zombies_count == 0);
1626 cfs_spin_unlock(&obd_zombie_impexp_lock);
1631 * wait when obd_zombie import/export queues become empty
1633 void obd_zombie_barrier(void)
1635 struct l_wait_info lwi = { 0 };
1637 if (obd_zombie_pid == cfs_curproc_pid())
1638 /* don't wait for myself */
1640 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1642 EXPORT_SYMBOL(obd_zombie_barrier);
1647 * destroy zombie export/import thread.
1649 static int obd_zombie_impexp_thread(void *unused)
1653 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1654 cfs_complete(&obd_zombie_start);
1658 cfs_complete(&obd_zombie_start);
1660 obd_zombie_pid = cfs_curproc_pid();
1662 while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1663 struct l_wait_info lwi = { 0 };
1665 l_wait_event(obd_zombie_waitq,
1666 !obd_zombie_impexp_check(NULL), &lwi);
1667 obd_zombie_impexp_cull();
1670 * Notify obd_zombie_barrier callers that queues
1673 cfs_waitq_signal(&obd_zombie_waitq);
1676 cfs_complete(&obd_zombie_stop);
1681 #else /* ! KERNEL */
1683 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1684 static void *obd_zombie_impexp_work_cb;
1685 static void *obd_zombie_impexp_idle_cb;
1687 int obd_zombie_impexp_kill(void *arg)
1691 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1692 obd_zombie_impexp_cull();
1695 cfs_atomic_dec(&zombie_recur);
1702 * start destroy zombie import/export thread
1704 int obd_zombie_impexp_init(void)
1708 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1709 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1710 cfs_spin_lock_init(&obd_zombie_impexp_lock);
1711 cfs_init_completion(&obd_zombie_start);
1712 cfs_init_completion(&obd_zombie_stop);
1713 cfs_waitq_init(&obd_zombie_waitq);
1717 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1721 cfs_wait_for_completion(&obd_zombie_start);
1724 obd_zombie_impexp_work_cb =
1725 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1726 &obd_zombie_impexp_kill, NULL);
1728 obd_zombie_impexp_idle_cb =
1729 liblustre_register_idle_callback("obd_zombi_impexp_check",
1730 &obd_zombie_impexp_check, NULL);
1736 * stop destroy zombie import/export thread
1738 void obd_zombie_impexp_stop(void)
1740 cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1741 obd_zombie_impexp_notify();
1743 cfs_wait_for_completion(&obd_zombie_stop);
1745 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1746 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1750 /***** Kernel-userspace comm helpers *******/
1752 /* Get length of entire message, including header */
1753 int kuc_len(int payload_len)
1755 return sizeof(struct kuc_hdr) + payload_len;
1757 EXPORT_SYMBOL(kuc_len);
1759 /* Get a pointer to kuc header, given a ptr to the payload
1760 * @param p Pointer to payload area
1761 * @returns Pointer to kuc header
1763 struct kuc_hdr * kuc_ptr(void *p)
1765 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1766 LASSERT(lh->kuc_magic == KUC_MAGIC);
1769 EXPORT_SYMBOL(kuc_ptr);
1771 /* Test if payload is part of kuc message
1772 * @param p Pointer to payload area
1775 int kuc_ispayload(void *p)
1777 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1779 if (kh->kuc_magic == KUC_MAGIC)
1784 EXPORT_SYMBOL(kuc_ispayload);
1786 /* Alloc space for a message, and fill in header
1787 * @return Pointer to payload area
1789 void *kuc_alloc(int payload_len, int transport, int type)
1792 int len = kuc_len(payload_len);
1796 return ERR_PTR(-ENOMEM);
1798 lh->kuc_magic = KUC_MAGIC;
1799 lh->kuc_transport = transport;
1800 lh->kuc_msgtype = type;
1801 lh->kuc_msglen = len;
1803 return (void *)(lh + 1);
1805 EXPORT_SYMBOL(kuc_alloc);
1807 /* Takes pointer to payload area */
1808 inline void kuc_free(void *p, int payload_len)
1810 struct kuc_hdr *lh = kuc_ptr(p);
1811 OBD_FREE(lh, kuc_len(payload_len));
1813 EXPORT_SYMBOL(kuc_free);