4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
46 #include <obd_class.h>
47 #include <lprocfs_status.h>
49 extern cfs_list_t obd_types;
50 spinlock_t obd_types_lock;
52 struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 struct kmem_cache *import_cachep;
57 cfs_list_t obd_zombie_imports;
58 cfs_list_t obd_zombie_exports;
59 spinlock_t obd_zombie_impexp_lock;
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64 const char *status, int locks);
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
70 * support functions: we could use inter-module communication, but this
71 * is more portable to other OS's
73 static struct obd_device *obd_device_alloc(void)
75 struct obd_device *obd;
77 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79 obd->obd_magic = OBD_DEVICE_MAGIC;
84 static void obd_device_free(struct obd_device *obd)
87 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89 if (obd->obd_namespace != NULL) {
90 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91 obd, obd->obd_namespace, obd->obd_force);
94 lu_ref_fini(&obd->obd_reference);
95 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 struct obd_type *class_search_type(const char *name)
101 struct obd_type *type;
103 spin_lock(&obd_types_lock);
104 cfs_list_for_each(tmp, &obd_types) {
105 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 spin_unlock(&obd_types_lock);
111 spin_unlock(&obd_types_lock);
114 EXPORT_SYMBOL(class_search_type);
116 struct obd_type *class_get_type(const char *name)
118 struct obd_type *type = class_search_type(name);
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
122 const char *modname = name;
124 if (strcmp(modname, "obdfilter") == 0)
127 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
128 modname = LUSTRE_OSP_NAME;
130 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
131 modname = LUSTRE_MDT_NAME;
133 if (!request_module("%s", modname)) {
134 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
135 type = class_search_type(name);
137 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
143 spin_lock(&type->obd_type_lock);
145 try_module_get(type->typ_dt_ops->o_owner);
146 spin_unlock(&type->obd_type_lock);
150 EXPORT_SYMBOL(class_get_type);
152 void class_put_type(struct obd_type *type)
155 spin_lock(&type->obd_type_lock);
157 module_put(type->typ_dt_ops->o_owner);
158 spin_unlock(&type->obd_type_lock);
160 EXPORT_SYMBOL(class_put_type);
162 #define CLASS_MAX_NAME 1024
164 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
165 struct lprocfs_seq_vars *module_vars,
166 #ifndef HAVE_ONLY_PROCFS_SEQ
167 struct lprocfs_vars *vars,
169 const char *name, struct lu_device_type *ldt)
171 struct obd_type *type;
176 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
178 if (class_search_type(name)) {
179 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
184 OBD_ALLOC(type, sizeof(*type));
188 OBD_ALLOC_PTR(type->typ_dt_ops);
189 OBD_ALLOC_PTR(type->typ_md_ops);
190 OBD_ALLOC(type->typ_name, strlen(name) + 1);
192 if (type->typ_dt_ops == NULL ||
193 type->typ_md_ops == NULL ||
194 type->typ_name == NULL)
197 *(type->typ_dt_ops) = *dt_ops;
198 /* md_ops is optional */
200 *(type->typ_md_ops) = *md_ops;
201 strcpy(type->typ_name, name);
202 spin_lock_init(&type->obd_type_lock);
205 #ifndef HAVE_ONLY_PROCFS_SEQ
207 type->typ_procroot = lprocfs_register(type->typ_name,
213 type->typ_procroot = lprocfs_seq_register(type->typ_name,
217 if (IS_ERR(type->typ_procroot)) {
218 rc = PTR_ERR(type->typ_procroot);
219 type->typ_procroot = NULL;
225 rc = lu_device_type_init(ldt);
230 spin_lock(&obd_types_lock);
231 cfs_list_add(&type->typ_chain, &obd_types);
232 spin_unlock(&obd_types_lock);
237 if (type->typ_name != NULL)
238 OBD_FREE(type->typ_name, strlen(name) + 1);
239 if (type->typ_md_ops != NULL)
240 OBD_FREE_PTR(type->typ_md_ops);
241 if (type->typ_dt_ops != NULL)
242 OBD_FREE_PTR(type->typ_dt_ops);
244 #ifndef HAVE_ONLY_PROCFS_SEQ
245 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
247 remove_proc_subtree(type->typ_name, proc_lustre_root);
250 OBD_FREE(type, sizeof(*type));
253 EXPORT_SYMBOL(class_register_type);
255 int class_unregister_type(const char *name)
257 struct obd_type *type = class_search_type(name);
261 CERROR("unknown obd type\n");
265 if (type->typ_refcnt) {
266 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
267 /* This is a bad situation, let's make the best of it */
268 /* Remove ops, but leave the name for debugging */
269 OBD_FREE_PTR(type->typ_dt_ops);
270 OBD_FREE_PTR(type->typ_md_ops);
274 /* we do not use type->typ_procroot as for compatibility purposes
275 * other modules can share names (i.e. lod can use lov entry). so
276 * we can't reference pointer as it can get invalided when another
277 * module removes the entry */
279 #ifndef HAVE_ONLY_PROCFS_SEQ
280 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
282 remove_proc_subtree(type->typ_name, proc_lustre_root);
286 lu_device_type_fini(type->typ_lu);
288 spin_lock(&obd_types_lock);
289 cfs_list_del(&type->typ_chain);
290 spin_unlock(&obd_types_lock);
291 OBD_FREE(type->typ_name, strlen(name) + 1);
292 if (type->typ_dt_ops != NULL)
293 OBD_FREE_PTR(type->typ_dt_ops);
294 if (type->typ_md_ops != NULL)
295 OBD_FREE_PTR(type->typ_md_ops);
296 OBD_FREE(type, sizeof(*type));
298 } /* class_unregister_type */
299 EXPORT_SYMBOL(class_unregister_type);
302 * Create a new obd device.
304 * Find an empty slot in ::obd_devs[], create a new obd device in it.
306 * \param[in] type_name obd device type string.
307 * \param[in] name obd device name.
309 * \retval NULL if create fails, otherwise return the obd device
312 struct obd_device *class_newdev(const char *type_name, const char *name)
314 struct obd_device *result = NULL;
315 struct obd_device *newdev;
316 struct obd_type *type = NULL;
318 int new_obd_minor = 0;
321 if (strlen(name) >= MAX_OBD_NAME) {
322 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
323 RETURN(ERR_PTR(-EINVAL));
326 type = class_get_type(type_name);
328 CERROR("OBD: unknown type: %s\n", type_name);
329 RETURN(ERR_PTR(-ENODEV));
332 newdev = obd_device_alloc();
334 GOTO(out_type, result = ERR_PTR(-ENOMEM));
336 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
338 write_lock(&obd_dev_lock);
339 for (i = 0; i < class_devno_max(); i++) {
340 struct obd_device *obd = class_num2obd(i);
342 if (obd && (strcmp(name, obd->obd_name) == 0)) {
343 CERROR("Device %s already exists at %d, won't add\n",
346 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
347 "%p obd_magic %08x != %08x\n", result,
348 result->obd_magic, OBD_DEVICE_MAGIC);
349 LASSERTF(result->obd_minor == new_obd_minor,
350 "%p obd_minor %d != %d\n", result,
351 result->obd_minor, new_obd_minor);
353 obd_devs[result->obd_minor] = NULL;
354 result->obd_name[0]='\0';
356 result = ERR_PTR(-EEXIST);
359 if (!result && !obd) {
361 result->obd_minor = i;
363 result->obd_type = type;
364 strncpy(result->obd_name, name,
365 sizeof(result->obd_name) - 1);
366 obd_devs[i] = result;
369 write_unlock(&obd_dev_lock);
371 if (result == NULL && i >= class_devno_max()) {
372 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
374 GOTO(out, result = ERR_PTR(-EOVERFLOW));
380 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
381 result->obd_name, result);
385 obd_device_free(newdev);
387 class_put_type(type);
391 void class_release_dev(struct obd_device *obd)
393 struct obd_type *obd_type = obd->obd_type;
395 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
396 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
397 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
398 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
399 LASSERT(obd_type != NULL);
401 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
402 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
404 write_lock(&obd_dev_lock);
405 obd_devs[obd->obd_minor] = NULL;
406 write_unlock(&obd_dev_lock);
407 obd_device_free(obd);
409 class_put_type(obd_type);
412 int class_name2dev(const char *name)
419 read_lock(&obd_dev_lock);
420 for (i = 0; i < class_devno_max(); i++) {
421 struct obd_device *obd = class_num2obd(i);
423 if (obd && strcmp(name, obd->obd_name) == 0) {
424 /* Make sure we finished attaching before we give
425 out any references */
426 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
427 if (obd->obd_attached) {
428 read_unlock(&obd_dev_lock);
434 read_unlock(&obd_dev_lock);
438 EXPORT_SYMBOL(class_name2dev);
440 struct obd_device *class_name2obd(const char *name)
442 int dev = class_name2dev(name);
444 if (dev < 0 || dev > class_devno_max())
446 return class_num2obd(dev);
448 EXPORT_SYMBOL(class_name2obd);
450 int class_uuid2dev(struct obd_uuid *uuid)
454 read_lock(&obd_dev_lock);
455 for (i = 0; i < class_devno_max(); i++) {
456 struct obd_device *obd = class_num2obd(i);
458 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
459 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
460 read_unlock(&obd_dev_lock);
464 read_unlock(&obd_dev_lock);
468 EXPORT_SYMBOL(class_uuid2dev);
470 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
472 int dev = class_uuid2dev(uuid);
475 return class_num2obd(dev);
477 EXPORT_SYMBOL(class_uuid2obd);
480 * Get obd device from ::obd_devs[]
482 * \param num [in] array index
484 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
485 * otherwise return the obd device there.
487 struct obd_device *class_num2obd(int num)
489 struct obd_device *obd = NULL;
491 if (num < class_devno_max()) {
496 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
497 "%p obd_magic %08x != %08x\n",
498 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
499 LASSERTF(obd->obd_minor == num,
500 "%p obd_minor %0d != %0d\n",
501 obd, obd->obd_minor, num);
506 EXPORT_SYMBOL(class_num2obd);
509 * Get obd devices count. Device in any
511 * \retval obd device count
513 int get_devices_count(void)
515 int index, max_index = class_devno_max(), dev_count = 0;
517 read_lock(&obd_dev_lock);
518 for (index = 0; index <= max_index; index++) {
519 struct obd_device *obd = class_num2obd(index);
523 read_unlock(&obd_dev_lock);
527 EXPORT_SYMBOL(get_devices_count);
529 void class_obd_list(void)
534 read_lock(&obd_dev_lock);
535 for (i = 0; i < class_devno_max(); i++) {
536 struct obd_device *obd = class_num2obd(i);
540 if (obd->obd_stopping)
542 else if (obd->obd_set_up)
544 else if (obd->obd_attached)
548 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
549 i, status, obd->obd_type->typ_name,
550 obd->obd_name, obd->obd_uuid.uuid,
551 atomic_read(&obd->obd_refcount));
553 read_unlock(&obd_dev_lock);
557 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
558 specified, then only the client with that uuid is returned,
559 otherwise any client connected to the tgt is returned. */
560 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
561 const char * typ_name,
562 struct obd_uuid *grp_uuid)
566 read_lock(&obd_dev_lock);
567 for (i = 0; i < class_devno_max(); i++) {
568 struct obd_device *obd = class_num2obd(i);
572 if ((strncmp(obd->obd_type->typ_name, typ_name,
573 strlen(typ_name)) == 0)) {
574 if (obd_uuid_equals(tgt_uuid,
575 &obd->u.cli.cl_target_uuid) &&
576 ((grp_uuid)? obd_uuid_equals(grp_uuid,
577 &obd->obd_uuid) : 1)) {
578 read_unlock(&obd_dev_lock);
583 read_unlock(&obd_dev_lock);
587 EXPORT_SYMBOL(class_find_client_obd);
589 /* Iterate the obd_device list looking devices have grp_uuid. Start
590 searching at *next, and if a device is found, the next index to look
591 at is saved in *next. If next is NULL, then the first matching device
592 will always be returned. */
593 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
599 else if (*next >= 0 && *next < class_devno_max())
604 read_lock(&obd_dev_lock);
605 for (; i < class_devno_max(); i++) {
606 struct obd_device *obd = class_num2obd(i);
610 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
613 read_unlock(&obd_dev_lock);
617 read_unlock(&obd_dev_lock);
621 EXPORT_SYMBOL(class_devices_in_group);
624 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
625 * adjust sptlrpc settings accordingly.
627 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
629 struct obd_device *obd;
633 LASSERT(namelen > 0);
635 read_lock(&obd_dev_lock);
636 for (i = 0; i < class_devno_max(); i++) {
637 obd = class_num2obd(i);
639 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
642 /* only notify mdc, osc, mdt, ost */
643 type = obd->obd_type->typ_name;
644 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
645 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
646 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
647 strcmp(type, LUSTRE_OST_NAME) != 0)
650 if (strncmp(obd->obd_name, fsname, namelen))
653 class_incref(obd, __FUNCTION__, obd);
654 read_unlock(&obd_dev_lock);
655 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
656 sizeof(KEY_SPTLRPC_CONF),
657 KEY_SPTLRPC_CONF, 0, NULL, NULL);
659 class_decref(obd, __FUNCTION__, obd);
660 read_lock(&obd_dev_lock);
662 read_unlock(&obd_dev_lock);
665 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
667 void obd_cleanup_caches(void)
670 if (obd_device_cachep) {
671 kmem_cache_destroy(obd_device_cachep);
672 obd_device_cachep = NULL;
675 kmem_cache_destroy(obdo_cachep);
679 kmem_cache_destroy(import_cachep);
680 import_cachep = NULL;
683 kmem_cache_destroy(capa_cachep);
689 int obd_init_caches(void)
694 LASSERT(obd_device_cachep == NULL);
695 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
696 sizeof(struct obd_device),
698 if (!obd_device_cachep)
699 GOTO(out, rc = -ENOMEM);
701 LASSERT(obdo_cachep == NULL);
702 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
705 GOTO(out, rc = -ENOMEM);
707 LASSERT(import_cachep == NULL);
708 import_cachep = kmem_cache_create("ll_import_cache",
709 sizeof(struct obd_import),
712 GOTO(out, rc = -ENOMEM);
714 LASSERT(capa_cachep == NULL);
715 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
718 GOTO(out, rc = -ENOMEM);
722 obd_cleanup_caches();
726 /* map connection to client */
727 struct obd_export *class_conn2export(struct lustre_handle *conn)
729 struct obd_export *export;
733 CDEBUG(D_CACHE, "looking for null handle\n");
737 if (conn->cookie == -1) { /* this means assign a new connection */
738 CDEBUG(D_CACHE, "want a new connection\n");
742 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
743 export = class_handle2object(conn->cookie, NULL);
746 EXPORT_SYMBOL(class_conn2export);
748 struct obd_device *class_exp2obd(struct obd_export *exp)
754 EXPORT_SYMBOL(class_exp2obd);
756 struct obd_device *class_conn2obd(struct lustre_handle *conn)
758 struct obd_export *export;
759 export = class_conn2export(conn);
761 struct obd_device *obd = export->exp_obd;
762 class_export_put(export);
767 EXPORT_SYMBOL(class_conn2obd);
769 struct obd_import *class_exp2cliimp(struct obd_export *exp)
771 struct obd_device *obd = exp->exp_obd;
774 return obd->u.cli.cl_import;
776 EXPORT_SYMBOL(class_exp2cliimp);
778 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
780 struct obd_device *obd = class_conn2obd(conn);
783 return obd->u.cli.cl_import;
785 EXPORT_SYMBOL(class_conn2cliimp);
787 /* Export management functions */
788 static void class_export_destroy(struct obd_export *exp)
790 struct obd_device *obd = exp->exp_obd;
793 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
794 LASSERT(obd != NULL);
796 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
797 exp->exp_client_uuid.uuid, obd->obd_name);
799 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
800 if (exp->exp_connection)
801 ptlrpc_put_connection_superhack(exp->exp_connection);
803 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
804 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
805 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
806 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
807 obd_destroy_export(exp);
808 class_decref(obd, "export", exp);
810 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
814 static void export_handle_addref(void *export)
816 class_export_get(export);
819 static struct portals_handle_ops export_handle_ops = {
820 .hop_addref = export_handle_addref,
824 struct obd_export *class_export_get(struct obd_export *exp)
826 atomic_inc(&exp->exp_refcount);
827 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
828 atomic_read(&exp->exp_refcount));
831 EXPORT_SYMBOL(class_export_get);
833 void class_export_put(struct obd_export *exp)
835 LASSERT(exp != NULL);
836 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
837 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
838 atomic_read(&exp->exp_refcount) - 1);
840 if (atomic_dec_and_test(&exp->exp_refcount)) {
841 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
842 CDEBUG(D_IOCTL, "final put %p/%s\n",
843 exp, exp->exp_client_uuid.uuid);
845 /* release nid stat refererence */
846 lprocfs_exp_cleanup(exp);
848 obd_zombie_export_add(exp);
851 EXPORT_SYMBOL(class_export_put);
853 /* Creates a new export, adds it to the hash table, and returns a
854 * pointer to it. The refcount is 2: one for the hash reference, and
855 * one for the pointer returned by this function. */
856 struct obd_export *class_new_export(struct obd_device *obd,
857 struct obd_uuid *cluuid)
859 struct obd_export *export;
860 cfs_hash_t *hash = NULL;
864 OBD_ALLOC_PTR(export);
866 return ERR_PTR(-ENOMEM);
868 export->exp_conn_cnt = 0;
869 export->exp_lock_hash = NULL;
870 export->exp_flock_hash = NULL;
871 atomic_set(&export->exp_refcount, 2);
872 atomic_set(&export->exp_rpc_count, 0);
873 atomic_set(&export->exp_cb_count, 0);
874 atomic_set(&export->exp_locks_count, 0);
875 #if LUSTRE_TRACKS_LOCK_EXP_REFS
876 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
877 spin_lock_init(&export->exp_locks_list_guard);
879 atomic_set(&export->exp_replay_count, 0);
880 export->exp_obd = obd;
881 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
882 spin_lock_init(&export->exp_uncommitted_replies_lock);
883 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
884 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
885 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
886 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
887 CFS_INIT_LIST_HEAD(&export->exp_reg_rpcs);
888 class_handle_hash(&export->exp_handle, &export_handle_ops);
889 export->exp_last_request_time = cfs_time_current_sec();
890 spin_lock_init(&export->exp_lock);
891 spin_lock_init(&export->exp_rpc_lock);
892 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
893 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
894 spin_lock_init(&export->exp_bl_list_lock);
895 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
897 export->exp_sp_peer = LUSTRE_SP_ANY;
898 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
899 export->exp_client_uuid = *cluuid;
900 obd_init_export(export);
902 spin_lock(&obd->obd_dev_lock);
903 /* shouldn't happen, but might race */
904 if (obd->obd_stopping)
905 GOTO(exit_unlock, rc = -ENODEV);
907 hash = cfs_hash_getref(obd->obd_uuid_hash);
909 GOTO(exit_unlock, rc = -ENODEV);
910 spin_unlock(&obd->obd_dev_lock);
912 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
913 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
915 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
916 obd->obd_name, cluuid->uuid, rc);
917 GOTO(exit_err, rc = -EALREADY);
921 spin_lock(&obd->obd_dev_lock);
922 if (obd->obd_stopping) {
923 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
924 GOTO(exit_unlock, rc = -ENODEV);
927 class_incref(obd, "export", export);
928 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
929 cfs_list_add_tail(&export->exp_obd_chain_timed,
930 &export->exp_obd->obd_exports_timed);
931 export->exp_obd->obd_num_exports++;
932 spin_unlock(&obd->obd_dev_lock);
933 cfs_hash_putref(hash);
937 spin_unlock(&obd->obd_dev_lock);
940 cfs_hash_putref(hash);
941 class_handle_unhash(&export->exp_handle);
942 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
943 obd_destroy_export(export);
944 OBD_FREE_PTR(export);
947 EXPORT_SYMBOL(class_new_export);
949 void class_unlink_export(struct obd_export *exp)
951 class_handle_unhash(&exp->exp_handle);
953 spin_lock(&exp->exp_obd->obd_dev_lock);
954 /* delete an uuid-export hashitem from hashtables */
955 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
956 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
957 &exp->exp_client_uuid,
958 &exp->exp_uuid_hash);
960 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
961 cfs_list_del_init(&exp->exp_obd_chain_timed);
962 exp->exp_obd->obd_num_exports--;
963 spin_unlock(&exp->exp_obd->obd_dev_lock);
964 class_export_put(exp);
966 EXPORT_SYMBOL(class_unlink_export);
968 /* Import management functions */
969 void class_import_destroy(struct obd_import *imp)
973 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
974 imp->imp_obd->obd_name);
976 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
978 ptlrpc_put_connection_superhack(imp->imp_connection);
980 while (!cfs_list_empty(&imp->imp_conn_list)) {
981 struct obd_import_conn *imp_conn;
983 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
984 struct obd_import_conn, oic_item);
985 cfs_list_del_init(&imp_conn->oic_item);
986 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
987 OBD_FREE(imp_conn, sizeof(*imp_conn));
990 LASSERT(imp->imp_sec == NULL);
991 class_decref(imp->imp_obd, "import", imp);
992 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
996 static void import_handle_addref(void *import)
998 class_import_get(import);
1001 static struct portals_handle_ops import_handle_ops = {
1002 .hop_addref = import_handle_addref,
1006 struct obd_import *class_import_get(struct obd_import *import)
1008 atomic_inc(&import->imp_refcount);
1009 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1010 atomic_read(&import->imp_refcount),
1011 import->imp_obd->obd_name);
1014 EXPORT_SYMBOL(class_import_get);
1016 void class_import_put(struct obd_import *imp)
1020 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1021 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1023 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1024 atomic_read(&imp->imp_refcount) - 1,
1025 imp->imp_obd->obd_name);
1027 if (atomic_dec_and_test(&imp->imp_refcount)) {
1028 CDEBUG(D_INFO, "final put import %p\n", imp);
1029 obd_zombie_import_add(imp);
1032 /* catch possible import put race */
1033 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1036 EXPORT_SYMBOL(class_import_put);
1038 static void init_imp_at(struct imp_at *at) {
1040 at_init(&at->iat_net_latency, 0, 0);
1041 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1042 /* max service estimates are tracked on the server side, so
1043 don't use the AT history here, just use the last reported
1044 val. (But keep hist for proc histogram, worst_ever) */
1045 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1050 struct obd_import *class_new_import(struct obd_device *obd)
1052 struct obd_import *imp;
1054 OBD_ALLOC(imp, sizeof(*imp));
1058 CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1059 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1060 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1061 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1062 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1063 CFS_INIT_LIST_HEAD(&imp->imp_committed_list);
1064 imp->imp_replay_cursor = &imp->imp_committed_list;
1065 spin_lock_init(&imp->imp_lock);
1066 imp->imp_last_success_conn = 0;
1067 imp->imp_state = LUSTRE_IMP_NEW;
1068 imp->imp_obd = class_incref(obd, "import", imp);
1069 mutex_init(&imp->imp_sec_mutex);
1070 init_waitqueue_head(&imp->imp_recovery_waitq);
1072 atomic_set(&imp->imp_refcount, 2);
1073 atomic_set(&imp->imp_unregistering, 0);
1074 atomic_set(&imp->imp_inflight, 0);
1075 atomic_set(&imp->imp_replay_inflight, 0);
1076 atomic_set(&imp->imp_inval_count, 0);
1077 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1078 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1079 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1080 init_imp_at(&imp->imp_at);
1082 /* the default magic is V2, will be used in connect RPC, and
1083 * then adjusted according to the flags in request/reply. */
1084 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1088 EXPORT_SYMBOL(class_new_import);
1090 void class_destroy_import(struct obd_import *import)
1092 LASSERT(import != NULL);
1093 LASSERT(import != LP_POISON);
1095 class_handle_unhash(&import->imp_handle);
1097 spin_lock(&import->imp_lock);
1098 import->imp_generation++;
1099 spin_unlock(&import->imp_lock);
1100 class_import_put(import);
1102 EXPORT_SYMBOL(class_destroy_import);
1104 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1106 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1108 spin_lock(&exp->exp_locks_list_guard);
1110 LASSERT(lock->l_exp_refs_nr >= 0);
1112 if (lock->l_exp_refs_target != NULL &&
1113 lock->l_exp_refs_target != exp) {
1114 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1115 exp, lock, lock->l_exp_refs_target);
1117 if ((lock->l_exp_refs_nr ++) == 0) {
1118 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1119 lock->l_exp_refs_target = exp;
1121 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1122 lock, exp, lock->l_exp_refs_nr);
1123 spin_unlock(&exp->exp_locks_list_guard);
1125 EXPORT_SYMBOL(__class_export_add_lock_ref);
1127 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1129 spin_lock(&exp->exp_locks_list_guard);
1130 LASSERT(lock->l_exp_refs_nr > 0);
1131 if (lock->l_exp_refs_target != exp) {
1132 LCONSOLE_WARN("lock %p, "
1133 "mismatching export pointers: %p, %p\n",
1134 lock, lock->l_exp_refs_target, exp);
1136 if (-- lock->l_exp_refs_nr == 0) {
1137 cfs_list_del_init(&lock->l_exp_refs_link);
1138 lock->l_exp_refs_target = NULL;
1140 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1141 lock, exp, lock->l_exp_refs_nr);
1142 spin_unlock(&exp->exp_locks_list_guard);
1144 EXPORT_SYMBOL(__class_export_del_lock_ref);
1147 /* A connection defines an export context in which preallocation can
1148 be managed. This releases the export pointer reference, and returns
1149 the export handle, so the export refcount is 1 when this function
1151 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1152 struct obd_uuid *cluuid)
1154 struct obd_export *export;
1155 LASSERT(conn != NULL);
1156 LASSERT(obd != NULL);
1157 LASSERT(cluuid != NULL);
1160 export = class_new_export(obd, cluuid);
1162 RETURN(PTR_ERR(export));
1164 conn->cookie = export->exp_handle.h_cookie;
1165 class_export_put(export);
1167 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1168 cluuid->uuid, conn->cookie);
1171 EXPORT_SYMBOL(class_connect);
1173 /* if export is involved in recovery then clean up related things */
1174 void class_export_recovery_cleanup(struct obd_export *exp)
1176 struct obd_device *obd = exp->exp_obd;
1178 spin_lock(&obd->obd_recovery_task_lock);
1179 if (exp->exp_delayed)
1180 obd->obd_delayed_clients--;
1181 if (obd->obd_recovering) {
1182 if (exp->exp_in_recovery) {
1183 spin_lock(&exp->exp_lock);
1184 exp->exp_in_recovery = 0;
1185 spin_unlock(&exp->exp_lock);
1186 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1187 atomic_dec(&obd->obd_connected_clients);
1190 /* if called during recovery then should update
1191 * obd_stale_clients counter,
1192 * lightweight exports are not counted */
1193 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1194 exp->exp_obd->obd_stale_clients++;
1196 spin_unlock(&obd->obd_recovery_task_lock);
1197 /** Cleanup req replay fields */
1198 if (exp->exp_req_replay_needed) {
1199 spin_lock(&exp->exp_lock);
1200 exp->exp_req_replay_needed = 0;
1201 spin_unlock(&exp->exp_lock);
1202 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1203 atomic_dec(&obd->obd_req_replay_clients);
1205 /** Cleanup lock replay data */
1206 if (exp->exp_lock_replay_needed) {
1207 spin_lock(&exp->exp_lock);
1208 exp->exp_lock_replay_needed = 0;
1209 spin_unlock(&exp->exp_lock);
1210 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1211 atomic_dec(&obd->obd_lock_replay_clients);
1215 /* This function removes 1-3 references from the export:
1216 * 1 - for export pointer passed
1217 * and if disconnect really need
1218 * 2 - removing from hash
1219 * 3 - in client_unlink_export
1220 * The export pointer passed to this function can destroyed */
1221 int class_disconnect(struct obd_export *export)
1223 int already_disconnected;
1226 if (export == NULL) {
1227 CWARN("attempting to free NULL export %p\n", export);
1231 spin_lock(&export->exp_lock);
1232 already_disconnected = export->exp_disconnected;
1233 export->exp_disconnected = 1;
1234 spin_unlock(&export->exp_lock);
1236 /* class_cleanup(), abort_recovery(), and class_fail_export()
1237 * all end up in here, and if any of them race we shouldn't
1238 * call extra class_export_puts(). */
1239 if (already_disconnected) {
1240 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1241 GOTO(no_disconn, already_disconnected);
1244 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1245 export->exp_handle.h_cookie);
1247 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1248 cfs_hash_del(export->exp_obd->obd_nid_hash,
1249 &export->exp_connection->c_peer.nid,
1250 &export->exp_nid_hash);
1252 class_export_recovery_cleanup(export);
1253 class_unlink_export(export);
1255 class_export_put(export);
1258 EXPORT_SYMBOL(class_disconnect);
1260 /* Return non-zero for a fully connected export */
1261 int class_connected_export(struct obd_export *exp)
1266 spin_lock(&exp->exp_lock);
1267 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1268 spin_unlock(&exp->exp_lock);
1272 EXPORT_SYMBOL(class_connected_export);
1274 static void class_disconnect_export_list(cfs_list_t *list,
1275 enum obd_option flags)
1278 struct obd_export *exp;
1281 /* It's possible that an export may disconnect itself, but
1282 * nothing else will be added to this list. */
1283 while (!cfs_list_empty(list)) {
1284 exp = cfs_list_entry(list->next, struct obd_export,
1286 /* need for safe call CDEBUG after obd_disconnect */
1287 class_export_get(exp);
1289 spin_lock(&exp->exp_lock);
1290 exp->exp_flags = flags;
1291 spin_unlock(&exp->exp_lock);
1293 if (obd_uuid_equals(&exp->exp_client_uuid,
1294 &exp->exp_obd->obd_uuid)) {
1296 "exp %p export uuid == obd uuid, don't discon\n",
1298 /* Need to delete this now so we don't end up pointing
1299 * to work_list later when this export is cleaned up. */
1300 cfs_list_del_init(&exp->exp_obd_chain);
1301 class_export_put(exp);
1305 class_export_get(exp);
1306 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1307 "last request at "CFS_TIME_T"\n",
1308 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1309 exp, exp->exp_last_request_time);
1310 /* release one export reference anyway */
1311 rc = obd_disconnect(exp);
1313 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1314 obd_export_nid2str(exp), exp, rc);
1315 class_export_put(exp);
1320 void class_disconnect_exports(struct obd_device *obd)
1322 cfs_list_t work_list;
1325 /* Move all of the exports from obd_exports to a work list, en masse. */
1326 CFS_INIT_LIST_HEAD(&work_list);
1327 spin_lock(&obd->obd_dev_lock);
1328 cfs_list_splice_init(&obd->obd_exports, &work_list);
1329 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1330 spin_unlock(&obd->obd_dev_lock);
1332 if (!cfs_list_empty(&work_list)) {
1333 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1334 "disconnecting them\n", obd->obd_minor, obd);
1335 class_disconnect_export_list(&work_list,
1336 exp_flags_from_obd(obd));
1338 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1339 obd->obd_minor, obd);
1342 EXPORT_SYMBOL(class_disconnect_exports);
1344 /* Remove exports that have not completed recovery.
1346 void class_disconnect_stale_exports(struct obd_device *obd,
1347 int (*test_export)(struct obd_export *))
1349 cfs_list_t work_list;
1350 struct obd_export *exp, *n;
1354 CFS_INIT_LIST_HEAD(&work_list);
1355 spin_lock(&obd->obd_dev_lock);
1356 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1358 /* don't count self-export as client */
1359 if (obd_uuid_equals(&exp->exp_client_uuid,
1360 &exp->exp_obd->obd_uuid))
1363 /* don't evict clients which have no slot in last_rcvd
1364 * (e.g. lightweight connection) */
1365 if (exp->exp_target_data.ted_lr_idx == -1)
1368 spin_lock(&exp->exp_lock);
1369 if (exp->exp_failed || test_export(exp)) {
1370 spin_unlock(&exp->exp_lock);
1373 exp->exp_failed = 1;
1374 spin_unlock(&exp->exp_lock);
1376 cfs_list_move(&exp->exp_obd_chain, &work_list);
1378 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1379 obd->obd_name, exp->exp_client_uuid.uuid,
1380 exp->exp_connection == NULL ? "<unknown>" :
1381 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1382 print_export_data(exp, "EVICTING", 0);
1384 spin_unlock(&obd->obd_dev_lock);
1387 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1388 obd->obd_name, evicted);
1390 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1391 OBD_OPT_ABORT_RECOV);
1394 EXPORT_SYMBOL(class_disconnect_stale_exports);
1396 void class_fail_export(struct obd_export *exp)
1398 int rc, already_failed;
1400 spin_lock(&exp->exp_lock);
1401 already_failed = exp->exp_failed;
1402 exp->exp_failed = 1;
1403 spin_unlock(&exp->exp_lock);
1405 if (already_failed) {
1406 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1407 exp, exp->exp_client_uuid.uuid);
1411 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1412 exp, exp->exp_client_uuid.uuid);
1414 if (obd_dump_on_timeout)
1415 libcfs_debug_dumplog();
1417 /* need for safe call CDEBUG after obd_disconnect */
1418 class_export_get(exp);
1420 /* Most callers into obd_disconnect are removing their own reference
1421 * (request, for example) in addition to the one from the hash table.
1422 * We don't have such a reference here, so make one. */
1423 class_export_get(exp);
1424 rc = obd_disconnect(exp);
1426 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1428 CDEBUG(D_HA, "disconnected export %p/%s\n",
1429 exp, exp->exp_client_uuid.uuid);
1430 class_export_put(exp);
1432 EXPORT_SYMBOL(class_fail_export);
1434 char *obd_export_nid2str(struct obd_export *exp)
1436 if (exp->exp_connection != NULL)
1437 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1441 EXPORT_SYMBOL(obd_export_nid2str);
1443 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1445 cfs_hash_t *nid_hash;
1446 struct obd_export *doomed_exp = NULL;
1447 int exports_evicted = 0;
1449 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1451 spin_lock(&obd->obd_dev_lock);
1452 /* umount has run already, so evict thread should leave
1453 * its task to umount thread now */
1454 if (obd->obd_stopping) {
1455 spin_unlock(&obd->obd_dev_lock);
1456 return exports_evicted;
1458 nid_hash = obd->obd_nid_hash;
1459 cfs_hash_getref(nid_hash);
1460 spin_unlock(&obd->obd_dev_lock);
1463 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1464 if (doomed_exp == NULL)
1467 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1468 "nid %s found, wanted nid %s, requested nid %s\n",
1469 obd_export_nid2str(doomed_exp),
1470 libcfs_nid2str(nid_key), nid);
1471 LASSERTF(doomed_exp != obd->obd_self_export,
1472 "self-export is hashed by NID?\n");
1474 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1475 "request\n", obd->obd_name,
1476 obd_uuid2str(&doomed_exp->exp_client_uuid),
1477 obd_export_nid2str(doomed_exp));
1478 class_fail_export(doomed_exp);
1479 class_export_put(doomed_exp);
1482 cfs_hash_putref(nid_hash);
1484 if (!exports_evicted)
1485 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1486 obd->obd_name, nid);
1487 return exports_evicted;
1489 EXPORT_SYMBOL(obd_export_evict_by_nid);
1491 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1493 cfs_hash_t *uuid_hash;
1494 struct obd_export *doomed_exp = NULL;
1495 struct obd_uuid doomed_uuid;
1496 int exports_evicted = 0;
1498 spin_lock(&obd->obd_dev_lock);
1499 if (obd->obd_stopping) {
1500 spin_unlock(&obd->obd_dev_lock);
1501 return exports_evicted;
1503 uuid_hash = obd->obd_uuid_hash;
1504 cfs_hash_getref(uuid_hash);
1505 spin_unlock(&obd->obd_dev_lock);
1507 obd_str2uuid(&doomed_uuid, uuid);
1508 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1509 CERROR("%s: can't evict myself\n", obd->obd_name);
1510 cfs_hash_putref(uuid_hash);
1511 return exports_evicted;
1514 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1516 if (doomed_exp == NULL) {
1517 CERROR("%s: can't disconnect %s: no exports found\n",
1518 obd->obd_name, uuid);
1520 CWARN("%s: evicting %s at adminstrative request\n",
1521 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1522 class_fail_export(doomed_exp);
1523 class_export_put(doomed_exp);
1526 cfs_hash_putref(uuid_hash);
1528 return exports_evicted;
1530 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1532 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1533 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1534 EXPORT_SYMBOL(class_export_dump_hook);
1537 static void print_export_data(struct obd_export *exp, const char *status,
1540 struct ptlrpc_reply_state *rs;
1541 struct ptlrpc_reply_state *first_reply = NULL;
1544 spin_lock(&exp->exp_lock);
1545 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1551 spin_unlock(&exp->exp_lock);
1553 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1554 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1555 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1556 atomic_read(&exp->exp_rpc_count),
1557 atomic_read(&exp->exp_cb_count),
1558 atomic_read(&exp->exp_locks_count),
1559 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1560 nreplies, first_reply, nreplies > 3 ? "..." : "",
1561 exp->exp_last_committed);
1562 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1563 if (locks && class_export_dump_hook != NULL)
1564 class_export_dump_hook(exp);
1568 void dump_exports(struct obd_device *obd, int locks)
1570 struct obd_export *exp;
1572 spin_lock(&obd->obd_dev_lock);
1573 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1574 print_export_data(exp, "ACTIVE", locks);
1575 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1576 print_export_data(exp, "UNLINKED", locks);
1577 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1578 print_export_data(exp, "DELAYED", locks);
1579 spin_unlock(&obd->obd_dev_lock);
1580 spin_lock(&obd_zombie_impexp_lock);
1581 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1582 print_export_data(exp, "ZOMBIE", locks);
1583 spin_unlock(&obd_zombie_impexp_lock);
1585 EXPORT_SYMBOL(dump_exports);
1587 void obd_exports_barrier(struct obd_device *obd)
1590 LASSERT(cfs_list_empty(&obd->obd_exports));
1591 spin_lock(&obd->obd_dev_lock);
1592 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1593 spin_unlock(&obd->obd_dev_lock);
1594 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1595 cfs_time_seconds(waited));
1596 if (waited > 5 && IS_PO2(waited)) {
1597 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1598 "more than %d seconds. "
1599 "The obd refcount = %d. Is it stuck?\n",
1600 obd->obd_name, waited,
1601 atomic_read(&obd->obd_refcount));
1602 dump_exports(obd, 1);
1605 spin_lock(&obd->obd_dev_lock);
1607 spin_unlock(&obd->obd_dev_lock);
1609 EXPORT_SYMBOL(obd_exports_barrier);
1611 /* Total amount of zombies to be destroyed */
1612 static int zombies_count = 0;
1615 * kill zombie imports and exports
1617 void obd_zombie_impexp_cull(void)
1619 struct obd_import *import;
1620 struct obd_export *export;
1624 spin_lock(&obd_zombie_impexp_lock);
1627 if (!cfs_list_empty(&obd_zombie_imports)) {
1628 import = cfs_list_entry(obd_zombie_imports.next,
1631 cfs_list_del_init(&import->imp_zombie_chain);
1635 if (!cfs_list_empty(&obd_zombie_exports)) {
1636 export = cfs_list_entry(obd_zombie_exports.next,
1639 cfs_list_del_init(&export->exp_obd_chain);
1642 spin_unlock(&obd_zombie_impexp_lock);
1644 if (import != NULL) {
1645 class_import_destroy(import);
1646 spin_lock(&obd_zombie_impexp_lock);
1648 spin_unlock(&obd_zombie_impexp_lock);
1651 if (export != NULL) {
1652 class_export_destroy(export);
1653 spin_lock(&obd_zombie_impexp_lock);
1655 spin_unlock(&obd_zombie_impexp_lock);
1659 } while (import != NULL || export != NULL);
1663 static struct completion obd_zombie_start;
1664 static struct completion obd_zombie_stop;
1665 static unsigned long obd_zombie_flags;
1666 static wait_queue_head_t obd_zombie_waitq;
1667 static pid_t obd_zombie_pid;
1670 OBD_ZOMBIE_STOP = 0x0001,
1674 * check for work for kill zombie import/export thread.
1676 static int obd_zombie_impexp_check(void *arg)
1680 spin_lock(&obd_zombie_impexp_lock);
1681 rc = (zombies_count == 0) &&
1682 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1683 spin_unlock(&obd_zombie_impexp_lock);
1689 * Add export to the obd_zombe thread and notify it.
1691 static void obd_zombie_export_add(struct obd_export *exp) {
1692 spin_lock(&exp->exp_obd->obd_dev_lock);
1693 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1694 cfs_list_del_init(&exp->exp_obd_chain);
1695 spin_unlock(&exp->exp_obd->obd_dev_lock);
1696 spin_lock(&obd_zombie_impexp_lock);
1698 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1699 spin_unlock(&obd_zombie_impexp_lock);
1701 obd_zombie_impexp_notify();
1705 * Add import to the obd_zombe thread and notify it.
1707 static void obd_zombie_import_add(struct obd_import *imp) {
1708 LASSERT(imp->imp_sec == NULL);
1709 LASSERT(imp->imp_rq_pool == NULL);
1710 spin_lock(&obd_zombie_impexp_lock);
1711 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1713 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1714 spin_unlock(&obd_zombie_impexp_lock);
1716 obd_zombie_impexp_notify();
1720 * notify import/export destroy thread about new zombie.
1722 static void obd_zombie_impexp_notify(void)
1725 * Make sure obd_zomebie_impexp_thread get this notification.
1726 * It is possible this signal only get by obd_zombie_barrier, and
1727 * barrier gulps this notification and sleeps away and hangs ensues
1729 wake_up_all(&obd_zombie_waitq);
1733 * check whether obd_zombie is idle
1735 static int obd_zombie_is_idle(void)
1739 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1740 spin_lock(&obd_zombie_impexp_lock);
1741 rc = (zombies_count == 0);
1742 spin_unlock(&obd_zombie_impexp_lock);
1747 * wait when obd_zombie import/export queues become empty
1749 void obd_zombie_barrier(void)
1751 struct l_wait_info lwi = { 0 };
1753 if (obd_zombie_pid == current_pid())
1754 /* don't wait for myself */
1756 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1758 EXPORT_SYMBOL(obd_zombie_barrier);
1763 * destroy zombie export/import thread.
1765 static int obd_zombie_impexp_thread(void *unused)
1767 unshare_fs_struct();
1768 complete(&obd_zombie_start);
1770 obd_zombie_pid = current_pid();
1772 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1773 struct l_wait_info lwi = { 0 };
1775 l_wait_event(obd_zombie_waitq,
1776 !obd_zombie_impexp_check(NULL), &lwi);
1777 obd_zombie_impexp_cull();
1780 * Notify obd_zombie_barrier callers that queues
1783 wake_up(&obd_zombie_waitq);
1786 complete(&obd_zombie_stop);
1791 #else /* ! KERNEL */
1793 static atomic_t zombie_recur = ATOMIC_INIT(0);
1794 static void *obd_zombie_impexp_work_cb;
1795 static void *obd_zombie_impexp_idle_cb;
1797 int obd_zombie_impexp_kill(void *arg)
1801 if (atomic_inc_return(&zombie_recur) == 1) {
1802 obd_zombie_impexp_cull();
1805 atomic_dec(&zombie_recur);
1812 * start destroy zombie import/export thread
1814 int obd_zombie_impexp_init(void)
1817 struct task_struct *task;
1820 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1821 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1822 spin_lock_init(&obd_zombie_impexp_lock);
1823 init_completion(&obd_zombie_start);
1824 init_completion(&obd_zombie_stop);
1825 init_waitqueue_head(&obd_zombie_waitq);
1829 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1831 RETURN(PTR_ERR(task));
1833 wait_for_completion(&obd_zombie_start);
1836 obd_zombie_impexp_work_cb =
1837 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1838 &obd_zombie_impexp_kill, NULL);
1840 obd_zombie_impexp_idle_cb =
1841 liblustre_register_idle_callback("obd_zombi_impexp_check",
1842 &obd_zombie_impexp_check, NULL);
1847 * stop destroy zombie import/export thread
1849 void obd_zombie_impexp_stop(void)
1851 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1852 obd_zombie_impexp_notify();
1854 wait_for_completion(&obd_zombie_stop);
1856 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1857 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1861 /***** Kernel-userspace comm helpers *******/
1863 /* Get length of entire message, including header */
1864 int kuc_len(int payload_len)
1866 return sizeof(struct kuc_hdr) + payload_len;
1868 EXPORT_SYMBOL(kuc_len);
1870 /* Get a pointer to kuc header, given a ptr to the payload
1871 * @param p Pointer to payload area
1872 * @returns Pointer to kuc header
1874 struct kuc_hdr * kuc_ptr(void *p)
1876 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1877 LASSERT(lh->kuc_magic == KUC_MAGIC);
1880 EXPORT_SYMBOL(kuc_ptr);
1882 /* Test if payload is part of kuc message
1883 * @param p Pointer to payload area
1886 int kuc_ispayload(void *p)
1888 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1890 if (kh->kuc_magic == KUC_MAGIC)
1895 EXPORT_SYMBOL(kuc_ispayload);
1897 /* Alloc space for a message, and fill in header
1898 * @return Pointer to payload area
1900 void *kuc_alloc(int payload_len, int transport, int type)
1903 int len = kuc_len(payload_len);
1907 return ERR_PTR(-ENOMEM);
1909 lh->kuc_magic = KUC_MAGIC;
1910 lh->kuc_transport = transport;
1911 lh->kuc_msgtype = type;
1912 lh->kuc_msglen = len;
1914 return (void *)(lh + 1);
1916 EXPORT_SYMBOL(kuc_alloc);
1918 /* Takes pointer to payload area */
1919 inline void kuc_free(void *p, int payload_len)
1921 struct kuc_hdr *lh = kuc_ptr(p);
1922 OBD_FREE(lh, kuc_len(payload_len));
1924 EXPORT_SYMBOL(kuc_free);