4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
53 struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 struct kmem_cache *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, __GFP_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129 modname = LUSTRE_OSP_NAME;
131 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132 modname = LUSTRE_MDT_NAME;
134 if (!request_module("%s", modname)) {
135 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136 type = class_search_type(name);
138 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144 spin_lock(&type->obd_type_lock);
146 try_module_get(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
151 EXPORT_SYMBOL(class_get_type);
153 void class_put_type(struct obd_type *type)
156 spin_lock(&type->obd_type_lock);
158 module_put(type->typ_dt_ops->o_owner);
159 spin_unlock(&type->obd_type_lock);
161 EXPORT_SYMBOL(class_put_type);
163 #define CLASS_MAX_NAME 1024
165 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
166 struct lprocfs_seq_vars *module_vars,
167 #ifndef HAVE_ONLY_PROCFS_SEQ
168 struct lprocfs_vars *vars,
170 const char *name, struct lu_device_type *ldt)
172 struct obd_type *type;
177 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
179 if (class_search_type(name)) {
180 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
185 OBD_ALLOC(type, sizeof(*type));
189 OBD_ALLOC_PTR(type->typ_dt_ops);
190 OBD_ALLOC_PTR(type->typ_md_ops);
191 OBD_ALLOC(type->typ_name, strlen(name) + 1);
193 if (type->typ_dt_ops == NULL ||
194 type->typ_md_ops == NULL ||
195 type->typ_name == NULL)
198 *(type->typ_dt_ops) = *dt_ops;
199 /* md_ops is optional */
201 *(type->typ_md_ops) = *md_ops;
202 strcpy(type->typ_name, name);
203 spin_lock_init(&type->obd_type_lock);
206 #ifndef HAVE_ONLY_PROCFS_SEQ
208 type->typ_procroot = lprocfs_register(type->typ_name,
214 type->typ_procroot = lprocfs_seq_register(type->typ_name,
218 if (IS_ERR(type->typ_procroot)) {
219 rc = PTR_ERR(type->typ_procroot);
220 type->typ_procroot = NULL;
226 rc = lu_device_type_init(ldt);
231 spin_lock(&obd_types_lock);
232 cfs_list_add(&type->typ_chain, &obd_types);
233 spin_unlock(&obd_types_lock);
238 if (type->typ_name != NULL)
239 OBD_FREE(type->typ_name, strlen(name) + 1);
240 if (type->typ_md_ops != NULL)
241 OBD_FREE_PTR(type->typ_md_ops);
242 if (type->typ_dt_ops != NULL)
243 OBD_FREE_PTR(type->typ_dt_ops);
245 #ifndef HAVE_ONLY_PROCFS_SEQ
246 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
248 remove_proc_subtree(type->typ_name, proc_lustre_root);
251 OBD_FREE(type, sizeof(*type));
254 EXPORT_SYMBOL(class_register_type);
256 int class_unregister_type(const char *name)
258 struct obd_type *type = class_search_type(name);
262 CERROR("unknown obd type\n");
266 if (type->typ_refcnt) {
267 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
268 /* This is a bad situation, let's make the best of it */
269 /* Remove ops, but leave the name for debugging */
270 OBD_FREE_PTR(type->typ_dt_ops);
271 OBD_FREE_PTR(type->typ_md_ops);
275 /* we do not use type->typ_procroot as for compatibility purposes
276 * other modules can share names (i.e. lod can use lov entry). so
277 * we can't reference pointer as it can get invalided when another
278 * module removes the entry */
280 #ifndef HAVE_ONLY_PROCFS_SEQ
281 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
283 remove_proc_subtree(type->typ_name, proc_lustre_root);
287 lu_device_type_fini(type->typ_lu);
289 spin_lock(&obd_types_lock);
290 cfs_list_del(&type->typ_chain);
291 spin_unlock(&obd_types_lock);
292 OBD_FREE(type->typ_name, strlen(name) + 1);
293 if (type->typ_dt_ops != NULL)
294 OBD_FREE_PTR(type->typ_dt_ops);
295 if (type->typ_md_ops != NULL)
296 OBD_FREE_PTR(type->typ_md_ops);
297 OBD_FREE(type, sizeof(*type));
299 } /* class_unregister_type */
300 EXPORT_SYMBOL(class_unregister_type);
303 * Create a new obd device.
305 * Find an empty slot in ::obd_devs[], create a new obd device in it.
307 * \param[in] type_name obd device type string.
308 * \param[in] name obd device name.
310 * \retval NULL if create fails, otherwise return the obd device
313 struct obd_device *class_newdev(const char *type_name, const char *name)
315 struct obd_device *result = NULL;
316 struct obd_device *newdev;
317 struct obd_type *type = NULL;
319 int new_obd_minor = 0;
322 if (strlen(name) >= MAX_OBD_NAME) {
323 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
324 RETURN(ERR_PTR(-EINVAL));
327 type = class_get_type(type_name);
329 CERROR("OBD: unknown type: %s\n", type_name);
330 RETURN(ERR_PTR(-ENODEV));
333 newdev = obd_device_alloc();
335 GOTO(out_type, result = ERR_PTR(-ENOMEM));
337 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
339 write_lock(&obd_dev_lock);
340 for (i = 0; i < class_devno_max(); i++) {
341 struct obd_device *obd = class_num2obd(i);
343 if (obd && (strcmp(name, obd->obd_name) == 0)) {
344 CERROR("Device %s already exists at %d, won't add\n",
347 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
348 "%p obd_magic %08x != %08x\n", result,
349 result->obd_magic, OBD_DEVICE_MAGIC);
350 LASSERTF(result->obd_minor == new_obd_minor,
351 "%p obd_minor %d != %d\n", result,
352 result->obd_minor, new_obd_minor);
354 obd_devs[result->obd_minor] = NULL;
355 result->obd_name[0]='\0';
357 result = ERR_PTR(-EEXIST);
360 if (!result && !obd) {
362 result->obd_minor = i;
364 result->obd_type = type;
365 strncpy(result->obd_name, name,
366 sizeof(result->obd_name) - 1);
367 obd_devs[i] = result;
370 write_unlock(&obd_dev_lock);
372 if (result == NULL && i >= class_devno_max()) {
373 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
375 GOTO(out, result = ERR_PTR(-EOVERFLOW));
381 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
382 result->obd_name, result);
386 obd_device_free(newdev);
388 class_put_type(type);
392 void class_release_dev(struct obd_device *obd)
394 struct obd_type *obd_type = obd->obd_type;
396 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
397 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
398 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
399 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
400 LASSERT(obd_type != NULL);
402 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
403 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
405 write_lock(&obd_dev_lock);
406 obd_devs[obd->obd_minor] = NULL;
407 write_unlock(&obd_dev_lock);
408 obd_device_free(obd);
410 class_put_type(obd_type);
413 int class_name2dev(const char *name)
420 read_lock(&obd_dev_lock);
421 for (i = 0; i < class_devno_max(); i++) {
422 struct obd_device *obd = class_num2obd(i);
424 if (obd && strcmp(name, obd->obd_name) == 0) {
425 /* Make sure we finished attaching before we give
426 out any references */
427 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
428 if (obd->obd_attached) {
429 read_unlock(&obd_dev_lock);
435 read_unlock(&obd_dev_lock);
439 EXPORT_SYMBOL(class_name2dev);
441 struct obd_device *class_name2obd(const char *name)
443 int dev = class_name2dev(name);
445 if (dev < 0 || dev > class_devno_max())
447 return class_num2obd(dev);
449 EXPORT_SYMBOL(class_name2obd);
451 int class_uuid2dev(struct obd_uuid *uuid)
455 read_lock(&obd_dev_lock);
456 for (i = 0; i < class_devno_max(); i++) {
457 struct obd_device *obd = class_num2obd(i);
459 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
460 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
461 read_unlock(&obd_dev_lock);
465 read_unlock(&obd_dev_lock);
469 EXPORT_SYMBOL(class_uuid2dev);
471 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
473 int dev = class_uuid2dev(uuid);
476 return class_num2obd(dev);
478 EXPORT_SYMBOL(class_uuid2obd);
481 * Get obd device from ::obd_devs[]
483 * \param num [in] array index
485 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
486 * otherwise return the obd device there.
488 struct obd_device *class_num2obd(int num)
490 struct obd_device *obd = NULL;
492 if (num < class_devno_max()) {
497 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
498 "%p obd_magic %08x != %08x\n",
499 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
500 LASSERTF(obd->obd_minor == num,
501 "%p obd_minor %0d != %0d\n",
502 obd, obd->obd_minor, num);
507 EXPORT_SYMBOL(class_num2obd);
510 * Get obd devices count. Device in any
512 * \retval obd device count
514 int get_devices_count(void)
516 int index, max_index = class_devno_max(), dev_count = 0;
518 read_lock(&obd_dev_lock);
519 for (index = 0; index <= max_index; index++) {
520 struct obd_device *obd = class_num2obd(index);
524 read_unlock(&obd_dev_lock);
528 EXPORT_SYMBOL(get_devices_count);
530 void class_obd_list(void)
535 read_lock(&obd_dev_lock);
536 for (i = 0; i < class_devno_max(); i++) {
537 struct obd_device *obd = class_num2obd(i);
541 if (obd->obd_stopping)
543 else if (obd->obd_set_up)
545 else if (obd->obd_attached)
549 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
550 i, status, obd->obd_type->typ_name,
551 obd->obd_name, obd->obd_uuid.uuid,
552 cfs_atomic_read(&obd->obd_refcount));
554 read_unlock(&obd_dev_lock);
558 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
559 specified, then only the client with that uuid is returned,
560 otherwise any client connected to the tgt is returned. */
561 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
562 const char * typ_name,
563 struct obd_uuid *grp_uuid)
567 read_lock(&obd_dev_lock);
568 for (i = 0; i < class_devno_max(); i++) {
569 struct obd_device *obd = class_num2obd(i);
573 if ((strncmp(obd->obd_type->typ_name, typ_name,
574 strlen(typ_name)) == 0)) {
575 if (obd_uuid_equals(tgt_uuid,
576 &obd->u.cli.cl_target_uuid) &&
577 ((grp_uuid)? obd_uuid_equals(grp_uuid,
578 &obd->obd_uuid) : 1)) {
579 read_unlock(&obd_dev_lock);
584 read_unlock(&obd_dev_lock);
588 EXPORT_SYMBOL(class_find_client_obd);
590 /* Iterate the obd_device list looking devices have grp_uuid. Start
591 searching at *next, and if a device is found, the next index to look
592 at is saved in *next. If next is NULL, then the first matching device
593 will always be returned. */
594 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
600 else if (*next >= 0 && *next < class_devno_max())
605 read_lock(&obd_dev_lock);
606 for (; i < class_devno_max(); i++) {
607 struct obd_device *obd = class_num2obd(i);
611 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
614 read_unlock(&obd_dev_lock);
618 read_unlock(&obd_dev_lock);
622 EXPORT_SYMBOL(class_devices_in_group);
625 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
626 * adjust sptlrpc settings accordingly.
628 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
630 struct obd_device *obd;
634 LASSERT(namelen > 0);
636 read_lock(&obd_dev_lock);
637 for (i = 0; i < class_devno_max(); i++) {
638 obd = class_num2obd(i);
640 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
643 /* only notify mdc, osc, mdt, ost */
644 type = obd->obd_type->typ_name;
645 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
646 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
647 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
648 strcmp(type, LUSTRE_OST_NAME) != 0)
651 if (strncmp(obd->obd_name, fsname, namelen))
654 class_incref(obd, __FUNCTION__, obd);
655 read_unlock(&obd_dev_lock);
656 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
657 sizeof(KEY_SPTLRPC_CONF),
658 KEY_SPTLRPC_CONF, 0, NULL, NULL);
660 class_decref(obd, __FUNCTION__, obd);
661 read_lock(&obd_dev_lock);
663 read_unlock(&obd_dev_lock);
666 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
668 void obd_cleanup_caches(void)
671 if (obd_device_cachep) {
672 kmem_cache_destroy(obd_device_cachep);
673 obd_device_cachep = NULL;
676 kmem_cache_destroy(obdo_cachep);
680 kmem_cache_destroy(import_cachep);
681 import_cachep = NULL;
684 kmem_cache_destroy(capa_cachep);
690 int obd_init_caches(void)
694 LASSERT(obd_device_cachep == NULL);
695 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
696 sizeof(struct obd_device),
698 if (!obd_device_cachep)
701 LASSERT(obdo_cachep == NULL);
702 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
707 LASSERT(import_cachep == NULL);
708 import_cachep = kmem_cache_create("ll_import_cache",
709 sizeof(struct obd_import),
714 LASSERT(capa_cachep == NULL);
715 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
722 obd_cleanup_caches();
727 /* map connection to client */
728 struct obd_export *class_conn2export(struct lustre_handle *conn)
730 struct obd_export *export;
734 CDEBUG(D_CACHE, "looking for null handle\n");
738 if (conn->cookie == -1) { /* this means assign a new connection */
739 CDEBUG(D_CACHE, "want a new connection\n");
743 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
744 export = class_handle2object(conn->cookie, NULL);
747 EXPORT_SYMBOL(class_conn2export);
749 struct obd_device *class_exp2obd(struct obd_export *exp)
755 EXPORT_SYMBOL(class_exp2obd);
757 struct obd_device *class_conn2obd(struct lustre_handle *conn)
759 struct obd_export *export;
760 export = class_conn2export(conn);
762 struct obd_device *obd = export->exp_obd;
763 class_export_put(export);
768 EXPORT_SYMBOL(class_conn2obd);
770 struct obd_import *class_exp2cliimp(struct obd_export *exp)
772 struct obd_device *obd = exp->exp_obd;
775 return obd->u.cli.cl_import;
777 EXPORT_SYMBOL(class_exp2cliimp);
779 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
781 struct obd_device *obd = class_conn2obd(conn);
784 return obd->u.cli.cl_import;
786 EXPORT_SYMBOL(class_conn2cliimp);
788 /* Export management functions */
789 static void class_export_destroy(struct obd_export *exp)
791 struct obd_device *obd = exp->exp_obd;
794 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
795 LASSERT(obd != NULL);
797 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
798 exp->exp_client_uuid.uuid, obd->obd_name);
800 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
801 if (exp->exp_connection)
802 ptlrpc_put_connection_superhack(exp->exp_connection);
804 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
805 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
806 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
807 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
808 obd_destroy_export(exp);
809 class_decref(obd, "export", exp);
811 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
815 static void export_handle_addref(void *export)
817 class_export_get(export);
820 static struct portals_handle_ops export_handle_ops = {
821 .hop_addref = export_handle_addref,
825 struct obd_export *class_export_get(struct obd_export *exp)
827 cfs_atomic_inc(&exp->exp_refcount);
828 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
829 cfs_atomic_read(&exp->exp_refcount));
832 EXPORT_SYMBOL(class_export_get);
834 void class_export_put(struct obd_export *exp)
836 LASSERT(exp != NULL);
837 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
838 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
839 cfs_atomic_read(&exp->exp_refcount) - 1);
841 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
842 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
843 CDEBUG(D_IOCTL, "final put %p/%s\n",
844 exp, exp->exp_client_uuid.uuid);
846 /* release nid stat refererence */
847 lprocfs_exp_cleanup(exp);
849 obd_zombie_export_add(exp);
852 EXPORT_SYMBOL(class_export_put);
854 /* Creates a new export, adds it to the hash table, and returns a
855 * pointer to it. The refcount is 2: one for the hash reference, and
856 * one for the pointer returned by this function. */
857 struct obd_export *class_new_export(struct obd_device *obd,
858 struct obd_uuid *cluuid)
860 struct obd_export *export;
861 cfs_hash_t *hash = NULL;
865 OBD_ALLOC_PTR(export);
867 return ERR_PTR(-ENOMEM);
869 export->exp_conn_cnt = 0;
870 export->exp_lock_hash = NULL;
871 export->exp_flock_hash = NULL;
872 cfs_atomic_set(&export->exp_refcount, 2);
873 cfs_atomic_set(&export->exp_rpc_count, 0);
874 cfs_atomic_set(&export->exp_cb_count, 0);
875 cfs_atomic_set(&export->exp_locks_count, 0);
876 #if LUSTRE_TRACKS_LOCK_EXP_REFS
877 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
878 spin_lock_init(&export->exp_locks_list_guard);
880 cfs_atomic_set(&export->exp_replay_count, 0);
881 export->exp_obd = obd;
882 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
883 spin_lock_init(&export->exp_uncommitted_replies_lock);
884 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
885 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
886 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
887 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
888 CFS_INIT_LIST_HEAD(&export->exp_reg_rpcs);
889 class_handle_hash(&export->exp_handle, &export_handle_ops);
890 export->exp_last_request_time = cfs_time_current_sec();
891 spin_lock_init(&export->exp_lock);
892 spin_lock_init(&export->exp_rpc_lock);
893 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
894 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
895 spin_lock_init(&export->exp_bl_list_lock);
896 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
898 export->exp_sp_peer = LUSTRE_SP_ANY;
899 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
900 export->exp_client_uuid = *cluuid;
901 obd_init_export(export);
903 spin_lock(&obd->obd_dev_lock);
904 /* shouldn't happen, but might race */
905 if (obd->obd_stopping)
906 GOTO(exit_unlock, rc = -ENODEV);
908 hash = cfs_hash_getref(obd->obd_uuid_hash);
910 GOTO(exit_unlock, rc = -ENODEV);
911 spin_unlock(&obd->obd_dev_lock);
913 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
914 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
916 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
917 obd->obd_name, cluuid->uuid, rc);
918 GOTO(exit_err, rc = -EALREADY);
922 spin_lock(&obd->obd_dev_lock);
923 if (obd->obd_stopping) {
924 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
925 GOTO(exit_unlock, rc = -ENODEV);
928 class_incref(obd, "export", export);
929 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
930 cfs_list_add_tail(&export->exp_obd_chain_timed,
931 &export->exp_obd->obd_exports_timed);
932 export->exp_obd->obd_num_exports++;
933 spin_unlock(&obd->obd_dev_lock);
934 cfs_hash_putref(hash);
938 spin_unlock(&obd->obd_dev_lock);
941 cfs_hash_putref(hash);
942 class_handle_unhash(&export->exp_handle);
943 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
944 obd_destroy_export(export);
945 OBD_FREE_PTR(export);
948 EXPORT_SYMBOL(class_new_export);
950 void class_unlink_export(struct obd_export *exp)
952 class_handle_unhash(&exp->exp_handle);
954 spin_lock(&exp->exp_obd->obd_dev_lock);
955 /* delete an uuid-export hashitem from hashtables */
956 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
957 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
958 &exp->exp_client_uuid,
959 &exp->exp_uuid_hash);
961 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
962 cfs_list_del_init(&exp->exp_obd_chain_timed);
963 exp->exp_obd->obd_num_exports--;
964 spin_unlock(&exp->exp_obd->obd_dev_lock);
965 class_export_put(exp);
967 EXPORT_SYMBOL(class_unlink_export);
969 /* Import management functions */
970 void class_import_destroy(struct obd_import *imp)
974 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
975 imp->imp_obd->obd_name);
977 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
979 ptlrpc_put_connection_superhack(imp->imp_connection);
981 while (!cfs_list_empty(&imp->imp_conn_list)) {
982 struct obd_import_conn *imp_conn;
984 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
985 struct obd_import_conn, oic_item);
986 cfs_list_del_init(&imp_conn->oic_item);
987 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
988 OBD_FREE(imp_conn, sizeof(*imp_conn));
991 LASSERT(imp->imp_sec == NULL);
992 class_decref(imp->imp_obd, "import", imp);
993 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
997 static void import_handle_addref(void *import)
999 class_import_get(import);
1002 static struct portals_handle_ops import_handle_ops = {
1003 .hop_addref = import_handle_addref,
1007 struct obd_import *class_import_get(struct obd_import *import)
1009 cfs_atomic_inc(&import->imp_refcount);
1010 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1011 cfs_atomic_read(&import->imp_refcount),
1012 import->imp_obd->obd_name);
1015 EXPORT_SYMBOL(class_import_get);
1017 void class_import_put(struct obd_import *imp)
1021 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1022 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1024 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1025 cfs_atomic_read(&imp->imp_refcount) - 1,
1026 imp->imp_obd->obd_name);
1028 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1029 CDEBUG(D_INFO, "final put import %p\n", imp);
1030 obd_zombie_import_add(imp);
1033 /* catch possible import put race */
1034 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1037 EXPORT_SYMBOL(class_import_put);
1039 static void init_imp_at(struct imp_at *at) {
1041 at_init(&at->iat_net_latency, 0, 0);
1042 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1043 /* max service estimates are tracked on the server side, so
1044 don't use the AT history here, just use the last reported
1045 val. (But keep hist for proc histogram, worst_ever) */
1046 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1051 struct obd_import *class_new_import(struct obd_device *obd)
1053 struct obd_import *imp;
1055 OBD_ALLOC(imp, sizeof(*imp));
1059 CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1060 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1061 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1062 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1063 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1064 CFS_INIT_LIST_HEAD(&imp->imp_committed_list);
1065 imp->imp_replay_cursor = &imp->imp_committed_list;
1066 spin_lock_init(&imp->imp_lock);
1067 imp->imp_last_success_conn = 0;
1068 imp->imp_state = LUSTRE_IMP_NEW;
1069 imp->imp_obd = class_incref(obd, "import", imp);
1070 mutex_init(&imp->imp_sec_mutex);
1071 init_waitqueue_head(&imp->imp_recovery_waitq);
1073 cfs_atomic_set(&imp->imp_refcount, 2);
1074 cfs_atomic_set(&imp->imp_unregistering, 0);
1075 cfs_atomic_set(&imp->imp_inflight, 0);
1076 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1077 cfs_atomic_set(&imp->imp_inval_count, 0);
1078 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1079 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1080 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1081 init_imp_at(&imp->imp_at);
1083 /* the default magic is V2, will be used in connect RPC, and
1084 * then adjusted according to the flags in request/reply. */
1085 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1089 EXPORT_SYMBOL(class_new_import);
1091 void class_destroy_import(struct obd_import *import)
1093 LASSERT(import != NULL);
1094 LASSERT(import != LP_POISON);
1096 class_handle_unhash(&import->imp_handle);
1098 spin_lock(&import->imp_lock);
1099 import->imp_generation++;
1100 spin_unlock(&import->imp_lock);
1101 class_import_put(import);
1103 EXPORT_SYMBOL(class_destroy_import);
1105 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1107 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1109 spin_lock(&exp->exp_locks_list_guard);
1111 LASSERT(lock->l_exp_refs_nr >= 0);
1113 if (lock->l_exp_refs_target != NULL &&
1114 lock->l_exp_refs_target != exp) {
1115 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1116 exp, lock, lock->l_exp_refs_target);
1118 if ((lock->l_exp_refs_nr ++) == 0) {
1119 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1120 lock->l_exp_refs_target = exp;
1122 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1123 lock, exp, lock->l_exp_refs_nr);
1124 spin_unlock(&exp->exp_locks_list_guard);
1126 EXPORT_SYMBOL(__class_export_add_lock_ref);
1128 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1130 spin_lock(&exp->exp_locks_list_guard);
1131 LASSERT(lock->l_exp_refs_nr > 0);
1132 if (lock->l_exp_refs_target != exp) {
1133 LCONSOLE_WARN("lock %p, "
1134 "mismatching export pointers: %p, %p\n",
1135 lock, lock->l_exp_refs_target, exp);
1137 if (-- lock->l_exp_refs_nr == 0) {
1138 cfs_list_del_init(&lock->l_exp_refs_link);
1139 lock->l_exp_refs_target = NULL;
1141 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1142 lock, exp, lock->l_exp_refs_nr);
1143 spin_unlock(&exp->exp_locks_list_guard);
1145 EXPORT_SYMBOL(__class_export_del_lock_ref);
1148 /* A connection defines an export context in which preallocation can
1149 be managed. This releases the export pointer reference, and returns
1150 the export handle, so the export refcount is 1 when this function
1152 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1153 struct obd_uuid *cluuid)
1155 struct obd_export *export;
1156 LASSERT(conn != NULL);
1157 LASSERT(obd != NULL);
1158 LASSERT(cluuid != NULL);
1161 export = class_new_export(obd, cluuid);
1163 RETURN(PTR_ERR(export));
1165 conn->cookie = export->exp_handle.h_cookie;
1166 class_export_put(export);
1168 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1169 cluuid->uuid, conn->cookie);
1172 EXPORT_SYMBOL(class_connect);
1174 /* if export is involved in recovery then clean up related things */
1175 void class_export_recovery_cleanup(struct obd_export *exp)
1177 struct obd_device *obd = exp->exp_obd;
1179 spin_lock(&obd->obd_recovery_task_lock);
1180 if (exp->exp_delayed)
1181 obd->obd_delayed_clients--;
1182 if (obd->obd_recovering) {
1183 if (exp->exp_in_recovery) {
1184 spin_lock(&exp->exp_lock);
1185 exp->exp_in_recovery = 0;
1186 spin_unlock(&exp->exp_lock);
1187 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1188 cfs_atomic_dec(&obd->obd_connected_clients);
1191 /* if called during recovery then should update
1192 * obd_stale_clients counter,
1193 * lightweight exports are not counted */
1194 if (exp->exp_failed &&
1195 (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1196 exp->exp_obd->obd_stale_clients++;
1198 spin_unlock(&obd->obd_recovery_task_lock);
1199 /** Cleanup req replay fields */
1200 if (exp->exp_req_replay_needed) {
1201 spin_lock(&exp->exp_lock);
1202 exp->exp_req_replay_needed = 0;
1203 spin_unlock(&exp->exp_lock);
1204 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1205 cfs_atomic_dec(&obd->obd_req_replay_clients);
1207 /** Cleanup lock replay data */
1208 if (exp->exp_lock_replay_needed) {
1209 spin_lock(&exp->exp_lock);
1210 exp->exp_lock_replay_needed = 0;
1211 spin_unlock(&exp->exp_lock);
1212 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1213 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1217 /* This function removes 1-3 references from the export:
1218 * 1 - for export pointer passed
1219 * and if disconnect really need
1220 * 2 - removing from hash
1221 * 3 - in client_unlink_export
1222 * The export pointer passed to this function can destroyed */
1223 int class_disconnect(struct obd_export *export)
1225 int already_disconnected;
1228 if (export == NULL) {
1229 CWARN("attempting to free NULL export %p\n", export);
1233 spin_lock(&export->exp_lock);
1234 already_disconnected = export->exp_disconnected;
1235 export->exp_disconnected = 1;
1236 spin_unlock(&export->exp_lock);
1238 /* class_cleanup(), abort_recovery(), and class_fail_export()
1239 * all end up in here, and if any of them race we shouldn't
1240 * call extra class_export_puts(). */
1241 if (already_disconnected) {
1242 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1243 GOTO(no_disconn, already_disconnected);
1246 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1247 export->exp_handle.h_cookie);
1249 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1250 cfs_hash_del(export->exp_obd->obd_nid_hash,
1251 &export->exp_connection->c_peer.nid,
1252 &export->exp_nid_hash);
1254 class_export_recovery_cleanup(export);
1255 class_unlink_export(export);
1257 class_export_put(export);
1260 EXPORT_SYMBOL(class_disconnect);
1262 /* Return non-zero for a fully connected export */
1263 int class_connected_export(struct obd_export *exp)
1268 spin_lock(&exp->exp_lock);
1269 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1270 spin_unlock(&exp->exp_lock);
1274 EXPORT_SYMBOL(class_connected_export);
1276 static void class_disconnect_export_list(cfs_list_t *list,
1277 enum obd_option flags)
1280 struct obd_export *exp;
1283 /* It's possible that an export may disconnect itself, but
1284 * nothing else will be added to this list. */
1285 while (!cfs_list_empty(list)) {
1286 exp = cfs_list_entry(list->next, struct obd_export,
1288 /* need for safe call CDEBUG after obd_disconnect */
1289 class_export_get(exp);
1291 spin_lock(&exp->exp_lock);
1292 exp->exp_flags = flags;
1293 spin_unlock(&exp->exp_lock);
1295 if (obd_uuid_equals(&exp->exp_client_uuid,
1296 &exp->exp_obd->obd_uuid)) {
1298 "exp %p export uuid == obd uuid, don't discon\n",
1300 /* Need to delete this now so we don't end up pointing
1301 * to work_list later when this export is cleaned up. */
1302 cfs_list_del_init(&exp->exp_obd_chain);
1303 class_export_put(exp);
1307 class_export_get(exp);
1308 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1309 "last request at "CFS_TIME_T"\n",
1310 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1311 exp, exp->exp_last_request_time);
1312 /* release one export reference anyway */
1313 rc = obd_disconnect(exp);
1315 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1316 obd_export_nid2str(exp), exp, rc);
1317 class_export_put(exp);
1322 void class_disconnect_exports(struct obd_device *obd)
1324 cfs_list_t work_list;
1327 /* Move all of the exports from obd_exports to a work list, en masse. */
1328 CFS_INIT_LIST_HEAD(&work_list);
1329 spin_lock(&obd->obd_dev_lock);
1330 cfs_list_splice_init(&obd->obd_exports, &work_list);
1331 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1332 spin_unlock(&obd->obd_dev_lock);
1334 if (!cfs_list_empty(&work_list)) {
1335 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1336 "disconnecting them\n", obd->obd_minor, obd);
1337 class_disconnect_export_list(&work_list,
1338 exp_flags_from_obd(obd));
1340 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1341 obd->obd_minor, obd);
1344 EXPORT_SYMBOL(class_disconnect_exports);
1346 /* Remove exports that have not completed recovery.
1348 void class_disconnect_stale_exports(struct obd_device *obd,
1349 int (*test_export)(struct obd_export *))
1351 cfs_list_t work_list;
1352 struct obd_export *exp, *n;
1356 CFS_INIT_LIST_HEAD(&work_list);
1357 spin_lock(&obd->obd_dev_lock);
1358 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1360 /* don't count self-export as client */
1361 if (obd_uuid_equals(&exp->exp_client_uuid,
1362 &exp->exp_obd->obd_uuid))
1365 /* don't evict clients which have no slot in last_rcvd
1366 * (e.g. lightweight connection) */
1367 if (exp->exp_target_data.ted_lr_idx == -1)
1370 spin_lock(&exp->exp_lock);
1371 if (exp->exp_failed || test_export(exp)) {
1372 spin_unlock(&exp->exp_lock);
1375 exp->exp_failed = 1;
1376 spin_unlock(&exp->exp_lock);
1378 cfs_list_move(&exp->exp_obd_chain, &work_list);
1380 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1381 obd->obd_name, exp->exp_client_uuid.uuid,
1382 exp->exp_connection == NULL ? "<unknown>" :
1383 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1384 print_export_data(exp, "EVICTING", 0);
1386 spin_unlock(&obd->obd_dev_lock);
1389 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1390 obd->obd_name, evicted);
1392 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1393 OBD_OPT_ABORT_RECOV);
1396 EXPORT_SYMBOL(class_disconnect_stale_exports);
1398 void class_fail_export(struct obd_export *exp)
1400 int rc, already_failed;
1402 spin_lock(&exp->exp_lock);
1403 already_failed = exp->exp_failed;
1404 exp->exp_failed = 1;
1405 spin_unlock(&exp->exp_lock);
1407 if (already_failed) {
1408 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1409 exp, exp->exp_client_uuid.uuid);
1413 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1414 exp, exp->exp_client_uuid.uuid);
1416 if (obd_dump_on_timeout)
1417 libcfs_debug_dumplog();
1419 /* need for safe call CDEBUG after obd_disconnect */
1420 class_export_get(exp);
1422 /* Most callers into obd_disconnect are removing their own reference
1423 * (request, for example) in addition to the one from the hash table.
1424 * We don't have such a reference here, so make one. */
1425 class_export_get(exp);
1426 rc = obd_disconnect(exp);
1428 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1430 CDEBUG(D_HA, "disconnected export %p/%s\n",
1431 exp, exp->exp_client_uuid.uuid);
1432 class_export_put(exp);
1434 EXPORT_SYMBOL(class_fail_export);
1436 char *obd_export_nid2str(struct obd_export *exp)
1438 if (exp->exp_connection != NULL)
1439 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1443 EXPORT_SYMBOL(obd_export_nid2str);
1445 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1447 cfs_hash_t *nid_hash;
1448 struct obd_export *doomed_exp = NULL;
1449 int exports_evicted = 0;
1451 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1453 spin_lock(&obd->obd_dev_lock);
1454 /* umount has run already, so evict thread should leave
1455 * its task to umount thread now */
1456 if (obd->obd_stopping) {
1457 spin_unlock(&obd->obd_dev_lock);
1458 return exports_evicted;
1460 nid_hash = obd->obd_nid_hash;
1461 cfs_hash_getref(nid_hash);
1462 spin_unlock(&obd->obd_dev_lock);
1465 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1466 if (doomed_exp == NULL)
1469 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1470 "nid %s found, wanted nid %s, requested nid %s\n",
1471 obd_export_nid2str(doomed_exp),
1472 libcfs_nid2str(nid_key), nid);
1473 LASSERTF(doomed_exp != obd->obd_self_export,
1474 "self-export is hashed by NID?\n");
1476 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1477 "request\n", obd->obd_name,
1478 obd_uuid2str(&doomed_exp->exp_client_uuid),
1479 obd_export_nid2str(doomed_exp));
1480 class_fail_export(doomed_exp);
1481 class_export_put(doomed_exp);
1484 cfs_hash_putref(nid_hash);
1486 if (!exports_evicted)
1487 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1488 obd->obd_name, nid);
1489 return exports_evicted;
1491 EXPORT_SYMBOL(obd_export_evict_by_nid);
1493 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1495 cfs_hash_t *uuid_hash;
1496 struct obd_export *doomed_exp = NULL;
1497 struct obd_uuid doomed_uuid;
1498 int exports_evicted = 0;
1500 spin_lock(&obd->obd_dev_lock);
1501 if (obd->obd_stopping) {
1502 spin_unlock(&obd->obd_dev_lock);
1503 return exports_evicted;
1505 uuid_hash = obd->obd_uuid_hash;
1506 cfs_hash_getref(uuid_hash);
1507 spin_unlock(&obd->obd_dev_lock);
1509 obd_str2uuid(&doomed_uuid, uuid);
1510 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1511 CERROR("%s: can't evict myself\n", obd->obd_name);
1512 cfs_hash_putref(uuid_hash);
1513 return exports_evicted;
1516 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1518 if (doomed_exp == NULL) {
1519 CERROR("%s: can't disconnect %s: no exports found\n",
1520 obd->obd_name, uuid);
1522 CWARN("%s: evicting %s at adminstrative request\n",
1523 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1524 class_fail_export(doomed_exp);
1525 class_export_put(doomed_exp);
1528 cfs_hash_putref(uuid_hash);
1530 return exports_evicted;
1532 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1534 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1535 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1536 EXPORT_SYMBOL(class_export_dump_hook);
1539 static void print_export_data(struct obd_export *exp, const char *status,
1542 struct ptlrpc_reply_state *rs;
1543 struct ptlrpc_reply_state *first_reply = NULL;
1546 spin_lock(&exp->exp_lock);
1547 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1553 spin_unlock(&exp->exp_lock);
1555 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1556 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1557 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1558 cfs_atomic_read(&exp->exp_rpc_count),
1559 cfs_atomic_read(&exp->exp_cb_count),
1560 cfs_atomic_read(&exp->exp_locks_count),
1561 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1562 nreplies, first_reply, nreplies > 3 ? "..." : "",
1563 exp->exp_last_committed);
1564 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1565 if (locks && class_export_dump_hook != NULL)
1566 class_export_dump_hook(exp);
1570 void dump_exports(struct obd_device *obd, int locks)
1572 struct obd_export *exp;
1574 spin_lock(&obd->obd_dev_lock);
1575 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1576 print_export_data(exp, "ACTIVE", locks);
1577 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1578 print_export_data(exp, "UNLINKED", locks);
1579 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1580 print_export_data(exp, "DELAYED", locks);
1581 spin_unlock(&obd->obd_dev_lock);
1582 spin_lock(&obd_zombie_impexp_lock);
1583 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1584 print_export_data(exp, "ZOMBIE", locks);
1585 spin_unlock(&obd_zombie_impexp_lock);
1587 EXPORT_SYMBOL(dump_exports);
1589 void obd_exports_barrier(struct obd_device *obd)
1592 LASSERT(cfs_list_empty(&obd->obd_exports));
1593 spin_lock(&obd->obd_dev_lock);
1594 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1595 spin_unlock(&obd->obd_dev_lock);
1596 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1597 cfs_time_seconds(waited));
1598 if (waited > 5 && IS_PO2(waited)) {
1599 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1600 "more than %d seconds. "
1601 "The obd refcount = %d. Is it stuck?\n",
1602 obd->obd_name, waited,
1603 cfs_atomic_read(&obd->obd_refcount));
1604 dump_exports(obd, 1);
1607 spin_lock(&obd->obd_dev_lock);
1609 spin_unlock(&obd->obd_dev_lock);
1611 EXPORT_SYMBOL(obd_exports_barrier);
1613 /* Total amount of zombies to be destroyed */
1614 static int zombies_count = 0;
1617 * kill zombie imports and exports
1619 void obd_zombie_impexp_cull(void)
1621 struct obd_import *import;
1622 struct obd_export *export;
1626 spin_lock(&obd_zombie_impexp_lock);
1629 if (!cfs_list_empty(&obd_zombie_imports)) {
1630 import = cfs_list_entry(obd_zombie_imports.next,
1633 cfs_list_del_init(&import->imp_zombie_chain);
1637 if (!cfs_list_empty(&obd_zombie_exports)) {
1638 export = cfs_list_entry(obd_zombie_exports.next,
1641 cfs_list_del_init(&export->exp_obd_chain);
1644 spin_unlock(&obd_zombie_impexp_lock);
1646 if (import != NULL) {
1647 class_import_destroy(import);
1648 spin_lock(&obd_zombie_impexp_lock);
1650 spin_unlock(&obd_zombie_impexp_lock);
1653 if (export != NULL) {
1654 class_export_destroy(export);
1655 spin_lock(&obd_zombie_impexp_lock);
1657 spin_unlock(&obd_zombie_impexp_lock);
1661 } while (import != NULL || export != NULL);
1665 static struct completion obd_zombie_start;
1666 static struct completion obd_zombie_stop;
1667 static unsigned long obd_zombie_flags;
1668 static wait_queue_head_t obd_zombie_waitq;
1669 static pid_t obd_zombie_pid;
1672 OBD_ZOMBIE_STOP = 0x0001,
1676 * check for work for kill zombie import/export thread.
1678 static int obd_zombie_impexp_check(void *arg)
1682 spin_lock(&obd_zombie_impexp_lock);
1683 rc = (zombies_count == 0) &&
1684 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1685 spin_unlock(&obd_zombie_impexp_lock);
1691 * Add export to the obd_zombe thread and notify it.
1693 static void obd_zombie_export_add(struct obd_export *exp) {
1694 spin_lock(&exp->exp_obd->obd_dev_lock);
1695 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1696 cfs_list_del_init(&exp->exp_obd_chain);
1697 spin_unlock(&exp->exp_obd->obd_dev_lock);
1698 spin_lock(&obd_zombie_impexp_lock);
1700 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1701 spin_unlock(&obd_zombie_impexp_lock);
1703 obd_zombie_impexp_notify();
1707 * Add import to the obd_zombe thread and notify it.
1709 static void obd_zombie_import_add(struct obd_import *imp) {
1710 LASSERT(imp->imp_sec == NULL);
1711 LASSERT(imp->imp_rq_pool == NULL);
1712 spin_lock(&obd_zombie_impexp_lock);
1713 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1715 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1716 spin_unlock(&obd_zombie_impexp_lock);
1718 obd_zombie_impexp_notify();
1722 * notify import/export destroy thread about new zombie.
1724 static void obd_zombie_impexp_notify(void)
1727 * Make sure obd_zomebie_impexp_thread get this notification.
1728 * It is possible this signal only get by obd_zombie_barrier, and
1729 * barrier gulps this notification and sleeps away and hangs ensues
1731 wake_up_all(&obd_zombie_waitq);
1735 * check whether obd_zombie is idle
1737 static int obd_zombie_is_idle(void)
1741 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1742 spin_lock(&obd_zombie_impexp_lock);
1743 rc = (zombies_count == 0);
1744 spin_unlock(&obd_zombie_impexp_lock);
1749 * wait when obd_zombie import/export queues become empty
1751 void obd_zombie_barrier(void)
1753 struct l_wait_info lwi = { 0 };
1755 if (obd_zombie_pid == current_pid())
1756 /* don't wait for myself */
1758 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1760 EXPORT_SYMBOL(obd_zombie_barrier);
1765 * destroy zombie export/import thread.
1767 static int obd_zombie_impexp_thread(void *unused)
1769 unshare_fs_struct();
1770 complete(&obd_zombie_start);
1772 obd_zombie_pid = current_pid();
1774 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1775 struct l_wait_info lwi = { 0 };
1777 l_wait_event(obd_zombie_waitq,
1778 !obd_zombie_impexp_check(NULL), &lwi);
1779 obd_zombie_impexp_cull();
1782 * Notify obd_zombie_barrier callers that queues
1785 wake_up(&obd_zombie_waitq);
1788 complete(&obd_zombie_stop);
1793 #else /* ! KERNEL */
1795 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1796 static void *obd_zombie_impexp_work_cb;
1797 static void *obd_zombie_impexp_idle_cb;
1799 int obd_zombie_impexp_kill(void *arg)
1803 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1804 obd_zombie_impexp_cull();
1807 cfs_atomic_dec(&zombie_recur);
1814 * start destroy zombie import/export thread
1816 int obd_zombie_impexp_init(void)
1819 struct task_struct *task;
1822 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1823 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1824 spin_lock_init(&obd_zombie_impexp_lock);
1825 init_completion(&obd_zombie_start);
1826 init_completion(&obd_zombie_stop);
1827 init_waitqueue_head(&obd_zombie_waitq);
1831 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1833 RETURN(PTR_ERR(task));
1835 wait_for_completion(&obd_zombie_start);
1838 obd_zombie_impexp_work_cb =
1839 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1840 &obd_zombie_impexp_kill, NULL);
1842 obd_zombie_impexp_idle_cb =
1843 liblustre_register_idle_callback("obd_zombi_impexp_check",
1844 &obd_zombie_impexp_check, NULL);
1849 * stop destroy zombie import/export thread
1851 void obd_zombie_impexp_stop(void)
1853 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1854 obd_zombie_impexp_notify();
1856 wait_for_completion(&obd_zombie_stop);
1858 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1859 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1863 /***** Kernel-userspace comm helpers *******/
1865 /* Get length of entire message, including header */
1866 int kuc_len(int payload_len)
1868 return sizeof(struct kuc_hdr) + payload_len;
1870 EXPORT_SYMBOL(kuc_len);
1872 /* Get a pointer to kuc header, given a ptr to the payload
1873 * @param p Pointer to payload area
1874 * @returns Pointer to kuc header
1876 struct kuc_hdr * kuc_ptr(void *p)
1878 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1879 LASSERT(lh->kuc_magic == KUC_MAGIC);
1882 EXPORT_SYMBOL(kuc_ptr);
1884 /* Test if payload is part of kuc message
1885 * @param p Pointer to payload area
1888 int kuc_ispayload(void *p)
1890 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1892 if (kh->kuc_magic == KUC_MAGIC)
1897 EXPORT_SYMBOL(kuc_ispayload);
1899 /* Alloc space for a message, and fill in header
1900 * @return Pointer to payload area
1902 void *kuc_alloc(int payload_len, int transport, int type)
1905 int len = kuc_len(payload_len);
1909 return ERR_PTR(-ENOMEM);
1911 lh->kuc_magic = KUC_MAGIC;
1912 lh->kuc_transport = transport;
1913 lh->kuc_msgtype = type;
1914 lh->kuc_msglen = len;
1916 return (void *)(lh + 1);
1918 EXPORT_SYMBOL(kuc_alloc);
1920 /* Takes pointer to payload area */
1921 inline void kuc_free(void *p, int payload_len)
1923 struct kuc_hdr *lh = kuc_ptr(p);
1924 OBD_FREE(lh, kuc_len(payload_len));
1926 EXPORT_SYMBOL(kuc_free);