4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
46 #include <obd_class.h>
47 #include <lprocfs_status.h>
49 extern cfs_list_t obd_types;
50 spinlock_t obd_types_lock;
52 struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 struct kmem_cache *import_cachep;
57 cfs_list_t obd_zombie_imports;
58 cfs_list_t obd_zombie_exports;
59 spinlock_t obd_zombie_impexp_lock;
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64 const char *status, int locks);
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
70 * support functions: we could use inter-module communication, but this
71 * is more portable to other OS's
73 static struct obd_device *obd_device_alloc(void)
75 struct obd_device *obd;
77 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79 obd->obd_magic = OBD_DEVICE_MAGIC;
84 static void obd_device_free(struct obd_device *obd)
87 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89 if (obd->obd_namespace != NULL) {
90 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91 obd, obd->obd_namespace, obd->obd_force);
94 lu_ref_fini(&obd->obd_reference);
95 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 struct obd_type *class_search_type(const char *name)
101 struct obd_type *type;
103 spin_lock(&obd_types_lock);
104 cfs_list_for_each(tmp, &obd_types) {
105 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 spin_unlock(&obd_types_lock);
111 spin_unlock(&obd_types_lock);
114 EXPORT_SYMBOL(class_search_type);
116 struct obd_type *class_get_type(const char *name)
118 struct obd_type *type = class_search_type(name);
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
122 const char *modname = name;
124 if (strcmp(modname, "obdfilter") == 0)
127 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
128 modname = LUSTRE_OSP_NAME;
130 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
131 modname = LUSTRE_MDT_NAME;
133 if (!request_module("%s", modname)) {
134 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
135 type = class_search_type(name);
137 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
143 spin_lock(&type->obd_type_lock);
145 try_module_get(type->typ_dt_ops->o_owner);
146 spin_unlock(&type->obd_type_lock);
150 EXPORT_SYMBOL(class_get_type);
152 void class_put_type(struct obd_type *type)
155 spin_lock(&type->obd_type_lock);
157 module_put(type->typ_dt_ops->o_owner);
158 spin_unlock(&type->obd_type_lock);
160 EXPORT_SYMBOL(class_put_type);
162 #define CLASS_MAX_NAME 1024
164 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
165 bool enable_proc, struct lprocfs_seq_vars *module_vars,
166 #ifndef HAVE_ONLY_PROCFS_SEQ
167 struct lprocfs_vars *vars,
169 const char *name, struct lu_device_type *ldt)
171 struct obd_type *type;
176 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
178 if (class_search_type(name)) {
179 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
184 OBD_ALLOC(type, sizeof(*type));
188 OBD_ALLOC_PTR(type->typ_dt_ops);
189 OBD_ALLOC_PTR(type->typ_md_ops);
190 OBD_ALLOC(type->typ_name, strlen(name) + 1);
192 if (type->typ_dt_ops == NULL ||
193 type->typ_md_ops == NULL ||
194 type->typ_name == NULL)
197 *(type->typ_dt_ops) = *dt_ops;
198 /* md_ops is optional */
200 *(type->typ_md_ops) = *md_ops;
201 strcpy(type->typ_name, name);
202 spin_lock_init(&type->obd_type_lock);
206 #ifndef HAVE_ONLY_PROCFS_SEQ
208 type->typ_procroot = lprocfs_register(type->typ_name,
214 type->typ_procroot = lprocfs_seq_register(type->typ_name,
218 if (IS_ERR(type->typ_procroot)) {
219 rc = PTR_ERR(type->typ_procroot);
220 type->typ_procroot = NULL;
227 rc = lu_device_type_init(ldt);
232 spin_lock(&obd_types_lock);
233 cfs_list_add(&type->typ_chain, &obd_types);
234 spin_unlock(&obd_types_lock);
239 if (type->typ_name != NULL) {
241 if (type->typ_procroot != NULL) {
242 #ifndef HAVE_ONLY_PROCFS_SEQ
243 lprocfs_try_remove_proc_entry(type->typ_name,
246 remove_proc_subtree(type->typ_name, proc_lustre_root);
250 OBD_FREE(type->typ_name, strlen(name) + 1);
252 if (type->typ_md_ops != NULL)
253 OBD_FREE_PTR(type->typ_md_ops);
254 if (type->typ_dt_ops != NULL)
255 OBD_FREE_PTR(type->typ_dt_ops);
256 OBD_FREE(type, sizeof(*type));
259 EXPORT_SYMBOL(class_register_type);
261 int class_unregister_type(const char *name)
263 struct obd_type *type = class_search_type(name);
267 CERROR("unknown obd type\n");
271 if (type->typ_refcnt) {
272 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
273 /* This is a bad situation, let's make the best of it */
274 /* Remove ops, but leave the name for debugging */
275 OBD_FREE_PTR(type->typ_dt_ops);
276 OBD_FREE_PTR(type->typ_md_ops);
280 /* we do not use type->typ_procroot as for compatibility purposes
281 * other modules can share names (i.e. lod can use lov entry). so
282 * we can't reference pointer as it can get invalided when another
283 * module removes the entry */
285 if (type->typ_procroot != NULL) {
286 #ifndef HAVE_ONLY_PROCFS_SEQ
287 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
289 remove_proc_subtree(type->typ_name, proc_lustre_root);
293 if (type->typ_procsym != NULL)
294 lprocfs_remove(&type->typ_procsym);
297 lu_device_type_fini(type->typ_lu);
299 spin_lock(&obd_types_lock);
300 cfs_list_del(&type->typ_chain);
301 spin_unlock(&obd_types_lock);
302 OBD_FREE(type->typ_name, strlen(name) + 1);
303 if (type->typ_dt_ops != NULL)
304 OBD_FREE_PTR(type->typ_dt_ops);
305 if (type->typ_md_ops != NULL)
306 OBD_FREE_PTR(type->typ_md_ops);
307 OBD_FREE(type, sizeof(*type));
309 } /* class_unregister_type */
310 EXPORT_SYMBOL(class_unregister_type);
313 * Create a new obd device.
315 * Find an empty slot in ::obd_devs[], create a new obd device in it.
317 * \param[in] type_name obd device type string.
318 * \param[in] name obd device name.
320 * \retval NULL if create fails, otherwise return the obd device
323 struct obd_device *class_newdev(const char *type_name, const char *name)
325 struct obd_device *result = NULL;
326 struct obd_device *newdev;
327 struct obd_type *type = NULL;
329 int new_obd_minor = 0;
332 if (strlen(name) >= MAX_OBD_NAME) {
333 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
334 RETURN(ERR_PTR(-EINVAL));
337 type = class_get_type(type_name);
339 CERROR("OBD: unknown type: %s\n", type_name);
340 RETURN(ERR_PTR(-ENODEV));
343 newdev = obd_device_alloc();
345 GOTO(out_type, result = ERR_PTR(-ENOMEM));
347 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
349 write_lock(&obd_dev_lock);
350 for (i = 0; i < class_devno_max(); i++) {
351 struct obd_device *obd = class_num2obd(i);
353 if (obd && (strcmp(name, obd->obd_name) == 0)) {
354 CERROR("Device %s already exists at %d, won't add\n",
357 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
358 "%p obd_magic %08x != %08x\n", result,
359 result->obd_magic, OBD_DEVICE_MAGIC);
360 LASSERTF(result->obd_minor == new_obd_minor,
361 "%p obd_minor %d != %d\n", result,
362 result->obd_minor, new_obd_minor);
364 obd_devs[result->obd_minor] = NULL;
365 result->obd_name[0]='\0';
367 result = ERR_PTR(-EEXIST);
370 if (!result && !obd) {
372 result->obd_minor = i;
374 result->obd_type = type;
375 strncpy(result->obd_name, name,
376 sizeof(result->obd_name) - 1);
377 obd_devs[i] = result;
380 write_unlock(&obd_dev_lock);
382 if (result == NULL && i >= class_devno_max()) {
383 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
385 GOTO(out, result = ERR_PTR(-EOVERFLOW));
391 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
392 result->obd_name, result);
396 obd_device_free(newdev);
398 class_put_type(type);
402 void class_release_dev(struct obd_device *obd)
404 struct obd_type *obd_type = obd->obd_type;
406 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
407 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
408 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
409 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
410 LASSERT(obd_type != NULL);
412 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
413 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
415 write_lock(&obd_dev_lock);
416 obd_devs[obd->obd_minor] = NULL;
417 write_unlock(&obd_dev_lock);
418 obd_device_free(obd);
420 class_put_type(obd_type);
423 int class_name2dev(const char *name)
430 read_lock(&obd_dev_lock);
431 for (i = 0; i < class_devno_max(); i++) {
432 struct obd_device *obd = class_num2obd(i);
434 if (obd && strcmp(name, obd->obd_name) == 0) {
435 /* Make sure we finished attaching before we give
436 out any references */
437 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
438 if (obd->obd_attached) {
439 read_unlock(&obd_dev_lock);
445 read_unlock(&obd_dev_lock);
449 EXPORT_SYMBOL(class_name2dev);
451 struct obd_device *class_name2obd(const char *name)
453 int dev = class_name2dev(name);
455 if (dev < 0 || dev > class_devno_max())
457 return class_num2obd(dev);
459 EXPORT_SYMBOL(class_name2obd);
461 int class_uuid2dev(struct obd_uuid *uuid)
465 read_lock(&obd_dev_lock);
466 for (i = 0; i < class_devno_max(); i++) {
467 struct obd_device *obd = class_num2obd(i);
469 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
470 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
471 read_unlock(&obd_dev_lock);
475 read_unlock(&obd_dev_lock);
479 EXPORT_SYMBOL(class_uuid2dev);
481 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
483 int dev = class_uuid2dev(uuid);
486 return class_num2obd(dev);
488 EXPORT_SYMBOL(class_uuid2obd);
491 * Get obd device from ::obd_devs[]
493 * \param num [in] array index
495 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
496 * otherwise return the obd device there.
498 struct obd_device *class_num2obd(int num)
500 struct obd_device *obd = NULL;
502 if (num < class_devno_max()) {
507 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
508 "%p obd_magic %08x != %08x\n",
509 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
510 LASSERTF(obd->obd_minor == num,
511 "%p obd_minor %0d != %0d\n",
512 obd, obd->obd_minor, num);
517 EXPORT_SYMBOL(class_num2obd);
520 * Get obd devices count. Device in any
522 * \retval obd device count
524 int get_devices_count(void)
526 int index, max_index = class_devno_max(), dev_count = 0;
528 read_lock(&obd_dev_lock);
529 for (index = 0; index <= max_index; index++) {
530 struct obd_device *obd = class_num2obd(index);
534 read_unlock(&obd_dev_lock);
538 EXPORT_SYMBOL(get_devices_count);
540 void class_obd_list(void)
545 read_lock(&obd_dev_lock);
546 for (i = 0; i < class_devno_max(); i++) {
547 struct obd_device *obd = class_num2obd(i);
551 if (obd->obd_stopping)
553 else if (obd->obd_set_up)
555 else if (obd->obd_attached)
559 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
560 i, status, obd->obd_type->typ_name,
561 obd->obd_name, obd->obd_uuid.uuid,
562 atomic_read(&obd->obd_refcount));
564 read_unlock(&obd_dev_lock);
568 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
569 specified, then only the client with that uuid is returned,
570 otherwise any client connected to the tgt is returned. */
571 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
572 const char * typ_name,
573 struct obd_uuid *grp_uuid)
577 read_lock(&obd_dev_lock);
578 for (i = 0; i < class_devno_max(); i++) {
579 struct obd_device *obd = class_num2obd(i);
583 if ((strncmp(obd->obd_type->typ_name, typ_name,
584 strlen(typ_name)) == 0)) {
585 if (obd_uuid_equals(tgt_uuid,
586 &obd->u.cli.cl_target_uuid) &&
587 ((grp_uuid)? obd_uuid_equals(grp_uuid,
588 &obd->obd_uuid) : 1)) {
589 read_unlock(&obd_dev_lock);
594 read_unlock(&obd_dev_lock);
598 EXPORT_SYMBOL(class_find_client_obd);
600 /* Iterate the obd_device list looking devices have grp_uuid. Start
601 searching at *next, and if a device is found, the next index to look
602 at is saved in *next. If next is NULL, then the first matching device
603 will always be returned. */
604 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
610 else if (*next >= 0 && *next < class_devno_max())
615 read_lock(&obd_dev_lock);
616 for (; i < class_devno_max(); i++) {
617 struct obd_device *obd = class_num2obd(i);
621 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
624 read_unlock(&obd_dev_lock);
628 read_unlock(&obd_dev_lock);
632 EXPORT_SYMBOL(class_devices_in_group);
635 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
636 * adjust sptlrpc settings accordingly.
638 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
640 struct obd_device *obd;
644 LASSERT(namelen > 0);
646 read_lock(&obd_dev_lock);
647 for (i = 0; i < class_devno_max(); i++) {
648 obd = class_num2obd(i);
650 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
653 /* only notify mdc, osc, mdt, ost */
654 type = obd->obd_type->typ_name;
655 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
656 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
657 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
658 strcmp(type, LUSTRE_OST_NAME) != 0)
661 if (strncmp(obd->obd_name, fsname, namelen))
664 class_incref(obd, __FUNCTION__, obd);
665 read_unlock(&obd_dev_lock);
666 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
667 sizeof(KEY_SPTLRPC_CONF),
668 KEY_SPTLRPC_CONF, 0, NULL, NULL);
670 class_decref(obd, __FUNCTION__, obd);
671 read_lock(&obd_dev_lock);
673 read_unlock(&obd_dev_lock);
676 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
678 void obd_cleanup_caches(void)
681 if (obd_device_cachep) {
682 kmem_cache_destroy(obd_device_cachep);
683 obd_device_cachep = NULL;
686 kmem_cache_destroy(obdo_cachep);
690 kmem_cache_destroy(import_cachep);
691 import_cachep = NULL;
694 kmem_cache_destroy(capa_cachep);
700 int obd_init_caches(void)
705 LASSERT(obd_device_cachep == NULL);
706 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
707 sizeof(struct obd_device),
709 if (!obd_device_cachep)
710 GOTO(out, rc = -ENOMEM);
712 LASSERT(obdo_cachep == NULL);
713 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
716 GOTO(out, rc = -ENOMEM);
718 LASSERT(import_cachep == NULL);
719 import_cachep = kmem_cache_create("ll_import_cache",
720 sizeof(struct obd_import),
723 GOTO(out, rc = -ENOMEM);
725 LASSERT(capa_cachep == NULL);
726 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
729 GOTO(out, rc = -ENOMEM);
733 obd_cleanup_caches();
737 /* map connection to client */
738 struct obd_export *class_conn2export(struct lustre_handle *conn)
740 struct obd_export *export;
744 CDEBUG(D_CACHE, "looking for null handle\n");
748 if (conn->cookie == -1) { /* this means assign a new connection */
749 CDEBUG(D_CACHE, "want a new connection\n");
753 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
754 export = class_handle2object(conn->cookie, NULL);
757 EXPORT_SYMBOL(class_conn2export);
759 struct obd_device *class_exp2obd(struct obd_export *exp)
765 EXPORT_SYMBOL(class_exp2obd);
767 struct obd_device *class_conn2obd(struct lustre_handle *conn)
769 struct obd_export *export;
770 export = class_conn2export(conn);
772 struct obd_device *obd = export->exp_obd;
773 class_export_put(export);
778 EXPORT_SYMBOL(class_conn2obd);
780 struct obd_import *class_exp2cliimp(struct obd_export *exp)
782 struct obd_device *obd = exp->exp_obd;
785 return obd->u.cli.cl_import;
787 EXPORT_SYMBOL(class_exp2cliimp);
789 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
791 struct obd_device *obd = class_conn2obd(conn);
794 return obd->u.cli.cl_import;
796 EXPORT_SYMBOL(class_conn2cliimp);
798 /* Export management functions */
799 static void class_export_destroy(struct obd_export *exp)
801 struct obd_device *obd = exp->exp_obd;
804 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
805 LASSERT(obd != NULL);
807 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
808 exp->exp_client_uuid.uuid, obd->obd_name);
810 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
811 if (exp->exp_connection)
812 ptlrpc_put_connection_superhack(exp->exp_connection);
814 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
815 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
816 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
817 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
818 obd_destroy_export(exp);
819 class_decref(obd, "export", exp);
821 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
825 static void export_handle_addref(void *export)
827 class_export_get(export);
830 static struct portals_handle_ops export_handle_ops = {
831 .hop_addref = export_handle_addref,
835 struct obd_export *class_export_get(struct obd_export *exp)
837 atomic_inc(&exp->exp_refcount);
838 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
839 atomic_read(&exp->exp_refcount));
842 EXPORT_SYMBOL(class_export_get);
844 void class_export_put(struct obd_export *exp)
846 LASSERT(exp != NULL);
847 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
848 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
849 atomic_read(&exp->exp_refcount) - 1);
851 if (atomic_dec_and_test(&exp->exp_refcount)) {
852 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
853 CDEBUG(D_IOCTL, "final put %p/%s\n",
854 exp, exp->exp_client_uuid.uuid);
856 /* release nid stat refererence */
857 lprocfs_exp_cleanup(exp);
859 obd_zombie_export_add(exp);
862 EXPORT_SYMBOL(class_export_put);
864 /* Creates a new export, adds it to the hash table, and returns a
865 * pointer to it. The refcount is 2: one for the hash reference, and
866 * one for the pointer returned by this function. */
867 struct obd_export *class_new_export(struct obd_device *obd,
868 struct obd_uuid *cluuid)
870 struct obd_export *export;
871 cfs_hash_t *hash = NULL;
875 OBD_ALLOC_PTR(export);
877 return ERR_PTR(-ENOMEM);
879 export->exp_conn_cnt = 0;
880 export->exp_lock_hash = NULL;
881 export->exp_flock_hash = NULL;
882 atomic_set(&export->exp_refcount, 2);
883 atomic_set(&export->exp_rpc_count, 0);
884 atomic_set(&export->exp_cb_count, 0);
885 atomic_set(&export->exp_locks_count, 0);
886 #if LUSTRE_TRACKS_LOCK_EXP_REFS
887 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
888 spin_lock_init(&export->exp_locks_list_guard);
890 atomic_set(&export->exp_replay_count, 0);
891 export->exp_obd = obd;
892 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
893 spin_lock_init(&export->exp_uncommitted_replies_lock);
894 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
895 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
896 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
897 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
898 CFS_INIT_LIST_HEAD(&export->exp_reg_rpcs);
899 class_handle_hash(&export->exp_handle, &export_handle_ops);
900 export->exp_last_request_time = cfs_time_current_sec();
901 spin_lock_init(&export->exp_lock);
902 spin_lock_init(&export->exp_rpc_lock);
903 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
904 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
905 spin_lock_init(&export->exp_bl_list_lock);
906 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
908 export->exp_sp_peer = LUSTRE_SP_ANY;
909 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
910 export->exp_client_uuid = *cluuid;
911 obd_init_export(export);
913 spin_lock(&obd->obd_dev_lock);
914 /* shouldn't happen, but might race */
915 if (obd->obd_stopping)
916 GOTO(exit_unlock, rc = -ENODEV);
918 hash = cfs_hash_getref(obd->obd_uuid_hash);
920 GOTO(exit_unlock, rc = -ENODEV);
921 spin_unlock(&obd->obd_dev_lock);
923 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
924 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
926 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
927 obd->obd_name, cluuid->uuid, rc);
928 GOTO(exit_err, rc = -EALREADY);
932 spin_lock(&obd->obd_dev_lock);
933 if (obd->obd_stopping) {
934 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
935 GOTO(exit_unlock, rc = -ENODEV);
938 class_incref(obd, "export", export);
939 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
940 cfs_list_add_tail(&export->exp_obd_chain_timed,
941 &export->exp_obd->obd_exports_timed);
942 export->exp_obd->obd_num_exports++;
943 spin_unlock(&obd->obd_dev_lock);
944 cfs_hash_putref(hash);
948 spin_unlock(&obd->obd_dev_lock);
951 cfs_hash_putref(hash);
952 class_handle_unhash(&export->exp_handle);
953 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
954 obd_destroy_export(export);
955 OBD_FREE_PTR(export);
958 EXPORT_SYMBOL(class_new_export);
960 void class_unlink_export(struct obd_export *exp)
962 class_handle_unhash(&exp->exp_handle);
964 spin_lock(&exp->exp_obd->obd_dev_lock);
965 /* delete an uuid-export hashitem from hashtables */
966 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
967 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
968 &exp->exp_client_uuid,
969 &exp->exp_uuid_hash);
971 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
972 cfs_list_del_init(&exp->exp_obd_chain_timed);
973 exp->exp_obd->obd_num_exports--;
974 spin_unlock(&exp->exp_obd->obd_dev_lock);
975 class_export_put(exp);
977 EXPORT_SYMBOL(class_unlink_export);
979 /* Import management functions */
980 void class_import_destroy(struct obd_import *imp)
984 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
985 imp->imp_obd->obd_name);
987 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
989 ptlrpc_put_connection_superhack(imp->imp_connection);
991 while (!cfs_list_empty(&imp->imp_conn_list)) {
992 struct obd_import_conn *imp_conn;
994 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
995 struct obd_import_conn, oic_item);
996 cfs_list_del_init(&imp_conn->oic_item);
997 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
998 OBD_FREE(imp_conn, sizeof(*imp_conn));
1001 LASSERT(imp->imp_sec == NULL);
1002 class_decref(imp->imp_obd, "import", imp);
1003 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1007 static void import_handle_addref(void *import)
1009 class_import_get(import);
1012 static struct portals_handle_ops import_handle_ops = {
1013 .hop_addref = import_handle_addref,
1017 struct obd_import *class_import_get(struct obd_import *import)
1019 atomic_inc(&import->imp_refcount);
1020 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1021 atomic_read(&import->imp_refcount),
1022 import->imp_obd->obd_name);
1025 EXPORT_SYMBOL(class_import_get);
1027 void class_import_put(struct obd_import *imp)
1031 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1032 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1034 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1035 atomic_read(&imp->imp_refcount) - 1,
1036 imp->imp_obd->obd_name);
1038 if (atomic_dec_and_test(&imp->imp_refcount)) {
1039 CDEBUG(D_INFO, "final put import %p\n", imp);
1040 obd_zombie_import_add(imp);
1043 /* catch possible import put race */
1044 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1047 EXPORT_SYMBOL(class_import_put);
1049 static void init_imp_at(struct imp_at *at) {
1051 at_init(&at->iat_net_latency, 0, 0);
1052 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1053 /* max service estimates are tracked on the server side, so
1054 don't use the AT history here, just use the last reported
1055 val. (But keep hist for proc histogram, worst_ever) */
1056 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1061 struct obd_import *class_new_import(struct obd_device *obd)
1063 struct obd_import *imp;
1065 OBD_ALLOC(imp, sizeof(*imp));
1069 CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1070 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1071 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1072 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1073 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1074 CFS_INIT_LIST_HEAD(&imp->imp_committed_list);
1075 imp->imp_replay_cursor = &imp->imp_committed_list;
1076 spin_lock_init(&imp->imp_lock);
1077 imp->imp_last_success_conn = 0;
1078 imp->imp_state = LUSTRE_IMP_NEW;
1079 imp->imp_obd = class_incref(obd, "import", imp);
1080 mutex_init(&imp->imp_sec_mutex);
1081 init_waitqueue_head(&imp->imp_recovery_waitq);
1083 atomic_set(&imp->imp_refcount, 2);
1084 atomic_set(&imp->imp_unregistering, 0);
1085 atomic_set(&imp->imp_inflight, 0);
1086 atomic_set(&imp->imp_replay_inflight, 0);
1087 atomic_set(&imp->imp_inval_count, 0);
1088 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1089 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1090 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1091 init_imp_at(&imp->imp_at);
1093 /* the default magic is V2, will be used in connect RPC, and
1094 * then adjusted according to the flags in request/reply. */
1095 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1099 EXPORT_SYMBOL(class_new_import);
1101 void class_destroy_import(struct obd_import *import)
1103 LASSERT(import != NULL);
1104 LASSERT(import != LP_POISON);
1106 class_handle_unhash(&import->imp_handle);
1108 spin_lock(&import->imp_lock);
1109 import->imp_generation++;
1110 spin_unlock(&import->imp_lock);
1111 class_import_put(import);
1113 EXPORT_SYMBOL(class_destroy_import);
1115 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1117 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1119 spin_lock(&exp->exp_locks_list_guard);
1121 LASSERT(lock->l_exp_refs_nr >= 0);
1123 if (lock->l_exp_refs_target != NULL &&
1124 lock->l_exp_refs_target != exp) {
1125 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1126 exp, lock, lock->l_exp_refs_target);
1128 if ((lock->l_exp_refs_nr ++) == 0) {
1129 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1130 lock->l_exp_refs_target = exp;
1132 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1133 lock, exp, lock->l_exp_refs_nr);
1134 spin_unlock(&exp->exp_locks_list_guard);
1136 EXPORT_SYMBOL(__class_export_add_lock_ref);
1138 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1140 spin_lock(&exp->exp_locks_list_guard);
1141 LASSERT(lock->l_exp_refs_nr > 0);
1142 if (lock->l_exp_refs_target != exp) {
1143 LCONSOLE_WARN("lock %p, "
1144 "mismatching export pointers: %p, %p\n",
1145 lock, lock->l_exp_refs_target, exp);
1147 if (-- lock->l_exp_refs_nr == 0) {
1148 cfs_list_del_init(&lock->l_exp_refs_link);
1149 lock->l_exp_refs_target = NULL;
1151 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1152 lock, exp, lock->l_exp_refs_nr);
1153 spin_unlock(&exp->exp_locks_list_guard);
1155 EXPORT_SYMBOL(__class_export_del_lock_ref);
1158 /* A connection defines an export context in which preallocation can
1159 be managed. This releases the export pointer reference, and returns
1160 the export handle, so the export refcount is 1 when this function
1162 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1163 struct obd_uuid *cluuid)
1165 struct obd_export *export;
1166 LASSERT(conn != NULL);
1167 LASSERT(obd != NULL);
1168 LASSERT(cluuid != NULL);
1171 export = class_new_export(obd, cluuid);
1173 RETURN(PTR_ERR(export));
1175 conn->cookie = export->exp_handle.h_cookie;
1176 class_export_put(export);
1178 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1179 cluuid->uuid, conn->cookie);
1182 EXPORT_SYMBOL(class_connect);
1184 /* if export is involved in recovery then clean up related things */
1185 void class_export_recovery_cleanup(struct obd_export *exp)
1187 struct obd_device *obd = exp->exp_obd;
1189 spin_lock(&obd->obd_recovery_task_lock);
1190 if (obd->obd_recovering) {
1191 if (exp->exp_in_recovery) {
1192 spin_lock(&exp->exp_lock);
1193 exp->exp_in_recovery = 0;
1194 spin_unlock(&exp->exp_lock);
1195 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1196 atomic_dec(&obd->obd_connected_clients);
1199 /* if called during recovery then should update
1200 * obd_stale_clients counter,
1201 * lightweight exports are not counted */
1202 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1203 exp->exp_obd->obd_stale_clients++;
1205 spin_unlock(&obd->obd_recovery_task_lock);
1206 /** Cleanup req replay fields */
1207 if (exp->exp_req_replay_needed) {
1208 spin_lock(&exp->exp_lock);
1209 exp->exp_req_replay_needed = 0;
1210 spin_unlock(&exp->exp_lock);
1211 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1212 atomic_dec(&obd->obd_req_replay_clients);
1214 /** Cleanup lock replay data */
1215 if (exp->exp_lock_replay_needed) {
1216 spin_lock(&exp->exp_lock);
1217 exp->exp_lock_replay_needed = 0;
1218 spin_unlock(&exp->exp_lock);
1219 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1220 atomic_dec(&obd->obd_lock_replay_clients);
1224 /* This function removes 1-3 references from the export:
1225 * 1 - for export pointer passed
1226 * and if disconnect really need
1227 * 2 - removing from hash
1228 * 3 - in client_unlink_export
1229 * The export pointer passed to this function can destroyed */
1230 int class_disconnect(struct obd_export *export)
1232 int already_disconnected;
1235 if (export == NULL) {
1236 CWARN("attempting to free NULL export %p\n", export);
1240 spin_lock(&export->exp_lock);
1241 already_disconnected = export->exp_disconnected;
1242 export->exp_disconnected = 1;
1243 spin_unlock(&export->exp_lock);
1245 /* class_cleanup(), abort_recovery(), and class_fail_export()
1246 * all end up in here, and if any of them race we shouldn't
1247 * call extra class_export_puts(). */
1248 if (already_disconnected) {
1249 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1250 GOTO(no_disconn, already_disconnected);
1253 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1254 export->exp_handle.h_cookie);
1256 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1257 cfs_hash_del(export->exp_obd->obd_nid_hash,
1258 &export->exp_connection->c_peer.nid,
1259 &export->exp_nid_hash);
1261 class_export_recovery_cleanup(export);
1262 class_unlink_export(export);
1264 class_export_put(export);
1267 EXPORT_SYMBOL(class_disconnect);
1269 /* Return non-zero for a fully connected export */
1270 int class_connected_export(struct obd_export *exp)
1275 spin_lock(&exp->exp_lock);
1276 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1277 spin_unlock(&exp->exp_lock);
1281 EXPORT_SYMBOL(class_connected_export);
1283 static void class_disconnect_export_list(cfs_list_t *list,
1284 enum obd_option flags)
1287 struct obd_export *exp;
1290 /* It's possible that an export may disconnect itself, but
1291 * nothing else will be added to this list. */
1292 while (!cfs_list_empty(list)) {
1293 exp = cfs_list_entry(list->next, struct obd_export,
1295 /* need for safe call CDEBUG after obd_disconnect */
1296 class_export_get(exp);
1298 spin_lock(&exp->exp_lock);
1299 exp->exp_flags = flags;
1300 spin_unlock(&exp->exp_lock);
1302 if (obd_uuid_equals(&exp->exp_client_uuid,
1303 &exp->exp_obd->obd_uuid)) {
1305 "exp %p export uuid == obd uuid, don't discon\n",
1307 /* Need to delete this now so we don't end up pointing
1308 * to work_list later when this export is cleaned up. */
1309 cfs_list_del_init(&exp->exp_obd_chain);
1310 class_export_put(exp);
1314 class_export_get(exp);
1315 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1316 "last request at "CFS_TIME_T"\n",
1317 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1318 exp, exp->exp_last_request_time);
1319 /* release one export reference anyway */
1320 rc = obd_disconnect(exp);
1322 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1323 obd_export_nid2str(exp), exp, rc);
1324 class_export_put(exp);
1329 void class_disconnect_exports(struct obd_device *obd)
1331 cfs_list_t work_list;
1334 /* Move all of the exports from obd_exports to a work list, en masse. */
1335 CFS_INIT_LIST_HEAD(&work_list);
1336 spin_lock(&obd->obd_dev_lock);
1337 cfs_list_splice_init(&obd->obd_exports, &work_list);
1338 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1339 spin_unlock(&obd->obd_dev_lock);
1341 if (!cfs_list_empty(&work_list)) {
1342 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1343 "disconnecting them\n", obd->obd_minor, obd);
1344 class_disconnect_export_list(&work_list,
1345 exp_flags_from_obd(obd));
1347 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1348 obd->obd_minor, obd);
1351 EXPORT_SYMBOL(class_disconnect_exports);
1353 /* Remove exports that have not completed recovery.
1355 void class_disconnect_stale_exports(struct obd_device *obd,
1356 int (*test_export)(struct obd_export *))
1358 cfs_list_t work_list;
1359 struct obd_export *exp, *n;
1363 CFS_INIT_LIST_HEAD(&work_list);
1364 spin_lock(&obd->obd_dev_lock);
1365 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1367 /* don't count self-export as client */
1368 if (obd_uuid_equals(&exp->exp_client_uuid,
1369 &exp->exp_obd->obd_uuid))
1372 /* don't evict clients which have no slot in last_rcvd
1373 * (e.g. lightweight connection) */
1374 if (exp->exp_target_data.ted_lr_idx == -1)
1377 spin_lock(&exp->exp_lock);
1378 if (exp->exp_failed || test_export(exp)) {
1379 spin_unlock(&exp->exp_lock);
1382 exp->exp_failed = 1;
1383 spin_unlock(&exp->exp_lock);
1385 cfs_list_move(&exp->exp_obd_chain, &work_list);
1387 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1388 obd->obd_name, exp->exp_client_uuid.uuid,
1389 exp->exp_connection == NULL ? "<unknown>" :
1390 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1391 print_export_data(exp, "EVICTING", 0);
1393 spin_unlock(&obd->obd_dev_lock);
1396 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1397 obd->obd_name, evicted);
1399 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1400 OBD_OPT_ABORT_RECOV);
1403 EXPORT_SYMBOL(class_disconnect_stale_exports);
1405 void class_fail_export(struct obd_export *exp)
1407 int rc, already_failed;
1409 spin_lock(&exp->exp_lock);
1410 already_failed = exp->exp_failed;
1411 exp->exp_failed = 1;
1412 spin_unlock(&exp->exp_lock);
1414 if (already_failed) {
1415 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1416 exp, exp->exp_client_uuid.uuid);
1420 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1421 exp, exp->exp_client_uuid.uuid);
1423 if (obd_dump_on_timeout)
1424 libcfs_debug_dumplog();
1426 /* need for safe call CDEBUG after obd_disconnect */
1427 class_export_get(exp);
1429 /* Most callers into obd_disconnect are removing their own reference
1430 * (request, for example) in addition to the one from the hash table.
1431 * We don't have such a reference here, so make one. */
1432 class_export_get(exp);
1433 rc = obd_disconnect(exp);
1435 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1437 CDEBUG(D_HA, "disconnected export %p/%s\n",
1438 exp, exp->exp_client_uuid.uuid);
1439 class_export_put(exp);
1441 EXPORT_SYMBOL(class_fail_export);
1443 char *obd_export_nid2str(struct obd_export *exp)
1445 if (exp->exp_connection != NULL)
1446 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1450 EXPORT_SYMBOL(obd_export_nid2str);
1452 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1454 cfs_hash_t *nid_hash;
1455 struct obd_export *doomed_exp = NULL;
1456 int exports_evicted = 0;
1458 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1460 spin_lock(&obd->obd_dev_lock);
1461 /* umount has run already, so evict thread should leave
1462 * its task to umount thread now */
1463 if (obd->obd_stopping) {
1464 spin_unlock(&obd->obd_dev_lock);
1465 return exports_evicted;
1467 nid_hash = obd->obd_nid_hash;
1468 cfs_hash_getref(nid_hash);
1469 spin_unlock(&obd->obd_dev_lock);
1472 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1473 if (doomed_exp == NULL)
1476 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1477 "nid %s found, wanted nid %s, requested nid %s\n",
1478 obd_export_nid2str(doomed_exp),
1479 libcfs_nid2str(nid_key), nid);
1480 LASSERTF(doomed_exp != obd->obd_self_export,
1481 "self-export is hashed by NID?\n");
1483 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1484 "request\n", obd->obd_name,
1485 obd_uuid2str(&doomed_exp->exp_client_uuid),
1486 obd_export_nid2str(doomed_exp));
1487 class_fail_export(doomed_exp);
1488 class_export_put(doomed_exp);
1491 cfs_hash_putref(nid_hash);
1493 if (!exports_evicted)
1494 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1495 obd->obd_name, nid);
1496 return exports_evicted;
1498 EXPORT_SYMBOL(obd_export_evict_by_nid);
1500 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1502 cfs_hash_t *uuid_hash;
1503 struct obd_export *doomed_exp = NULL;
1504 struct obd_uuid doomed_uuid;
1505 int exports_evicted = 0;
1507 spin_lock(&obd->obd_dev_lock);
1508 if (obd->obd_stopping) {
1509 spin_unlock(&obd->obd_dev_lock);
1510 return exports_evicted;
1512 uuid_hash = obd->obd_uuid_hash;
1513 cfs_hash_getref(uuid_hash);
1514 spin_unlock(&obd->obd_dev_lock);
1516 obd_str2uuid(&doomed_uuid, uuid);
1517 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1518 CERROR("%s: can't evict myself\n", obd->obd_name);
1519 cfs_hash_putref(uuid_hash);
1520 return exports_evicted;
1523 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1525 if (doomed_exp == NULL) {
1526 CERROR("%s: can't disconnect %s: no exports found\n",
1527 obd->obd_name, uuid);
1529 CWARN("%s: evicting %s at adminstrative request\n",
1530 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1531 class_fail_export(doomed_exp);
1532 class_export_put(doomed_exp);
1535 cfs_hash_putref(uuid_hash);
1537 return exports_evicted;
1539 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1541 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1542 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1543 EXPORT_SYMBOL(class_export_dump_hook);
1546 static void print_export_data(struct obd_export *exp, const char *status,
1549 struct ptlrpc_reply_state *rs;
1550 struct ptlrpc_reply_state *first_reply = NULL;
1553 spin_lock(&exp->exp_lock);
1554 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1560 spin_unlock(&exp->exp_lock);
1562 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1563 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1564 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1565 atomic_read(&exp->exp_rpc_count),
1566 atomic_read(&exp->exp_cb_count),
1567 atomic_read(&exp->exp_locks_count),
1568 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1569 nreplies, first_reply, nreplies > 3 ? "..." : "",
1570 exp->exp_last_committed);
1571 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1572 if (locks && class_export_dump_hook != NULL)
1573 class_export_dump_hook(exp);
1577 void dump_exports(struct obd_device *obd, int locks)
1579 struct obd_export *exp;
1581 spin_lock(&obd->obd_dev_lock);
1582 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1583 print_export_data(exp, "ACTIVE", locks);
1584 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1585 print_export_data(exp, "UNLINKED", locks);
1586 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1587 print_export_data(exp, "DELAYED", locks);
1588 spin_unlock(&obd->obd_dev_lock);
1589 spin_lock(&obd_zombie_impexp_lock);
1590 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1591 print_export_data(exp, "ZOMBIE", locks);
1592 spin_unlock(&obd_zombie_impexp_lock);
1594 EXPORT_SYMBOL(dump_exports);
1596 void obd_exports_barrier(struct obd_device *obd)
1599 LASSERT(cfs_list_empty(&obd->obd_exports));
1600 spin_lock(&obd->obd_dev_lock);
1601 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1602 spin_unlock(&obd->obd_dev_lock);
1603 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1604 cfs_time_seconds(waited));
1605 if (waited > 5 && IS_PO2(waited)) {
1606 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1607 "more than %d seconds. "
1608 "The obd refcount = %d. Is it stuck?\n",
1609 obd->obd_name, waited,
1610 atomic_read(&obd->obd_refcount));
1611 dump_exports(obd, 1);
1614 spin_lock(&obd->obd_dev_lock);
1616 spin_unlock(&obd->obd_dev_lock);
1618 EXPORT_SYMBOL(obd_exports_barrier);
1620 /* Total amount of zombies to be destroyed */
1621 static int zombies_count = 0;
1624 * kill zombie imports and exports
1626 void obd_zombie_impexp_cull(void)
1628 struct obd_import *import;
1629 struct obd_export *export;
1633 spin_lock(&obd_zombie_impexp_lock);
1636 if (!cfs_list_empty(&obd_zombie_imports)) {
1637 import = cfs_list_entry(obd_zombie_imports.next,
1640 cfs_list_del_init(&import->imp_zombie_chain);
1644 if (!cfs_list_empty(&obd_zombie_exports)) {
1645 export = cfs_list_entry(obd_zombie_exports.next,
1648 cfs_list_del_init(&export->exp_obd_chain);
1651 spin_unlock(&obd_zombie_impexp_lock);
1653 if (import != NULL) {
1654 class_import_destroy(import);
1655 spin_lock(&obd_zombie_impexp_lock);
1657 spin_unlock(&obd_zombie_impexp_lock);
1660 if (export != NULL) {
1661 class_export_destroy(export);
1662 spin_lock(&obd_zombie_impexp_lock);
1664 spin_unlock(&obd_zombie_impexp_lock);
1668 } while (import != NULL || export != NULL);
1672 static struct completion obd_zombie_start;
1673 static struct completion obd_zombie_stop;
1674 static unsigned long obd_zombie_flags;
1675 static wait_queue_head_t obd_zombie_waitq;
1676 static pid_t obd_zombie_pid;
1679 OBD_ZOMBIE_STOP = 0x0001,
1683 * check for work for kill zombie import/export thread.
1685 static int obd_zombie_impexp_check(void *arg)
1689 spin_lock(&obd_zombie_impexp_lock);
1690 rc = (zombies_count == 0) &&
1691 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1692 spin_unlock(&obd_zombie_impexp_lock);
1698 * Add export to the obd_zombe thread and notify it.
1700 static void obd_zombie_export_add(struct obd_export *exp) {
1701 spin_lock(&exp->exp_obd->obd_dev_lock);
1702 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1703 cfs_list_del_init(&exp->exp_obd_chain);
1704 spin_unlock(&exp->exp_obd->obd_dev_lock);
1705 spin_lock(&obd_zombie_impexp_lock);
1707 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1708 spin_unlock(&obd_zombie_impexp_lock);
1710 obd_zombie_impexp_notify();
1714 * Add import to the obd_zombe thread and notify it.
1716 static void obd_zombie_import_add(struct obd_import *imp) {
1717 LASSERT(imp->imp_sec == NULL);
1718 LASSERT(imp->imp_rq_pool == NULL);
1719 spin_lock(&obd_zombie_impexp_lock);
1720 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1722 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1723 spin_unlock(&obd_zombie_impexp_lock);
1725 obd_zombie_impexp_notify();
1729 * notify import/export destroy thread about new zombie.
1731 static void obd_zombie_impexp_notify(void)
1734 * Make sure obd_zomebie_impexp_thread get this notification.
1735 * It is possible this signal only get by obd_zombie_barrier, and
1736 * barrier gulps this notification and sleeps away and hangs ensues
1738 wake_up_all(&obd_zombie_waitq);
1742 * check whether obd_zombie is idle
1744 static int obd_zombie_is_idle(void)
1748 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1749 spin_lock(&obd_zombie_impexp_lock);
1750 rc = (zombies_count == 0);
1751 spin_unlock(&obd_zombie_impexp_lock);
1756 * wait when obd_zombie import/export queues become empty
1758 void obd_zombie_barrier(void)
1760 struct l_wait_info lwi = { 0 };
1762 if (obd_zombie_pid == current_pid())
1763 /* don't wait for myself */
1765 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1767 EXPORT_SYMBOL(obd_zombie_barrier);
1772 * destroy zombie export/import thread.
1774 static int obd_zombie_impexp_thread(void *unused)
1776 unshare_fs_struct();
1777 complete(&obd_zombie_start);
1779 obd_zombie_pid = current_pid();
1781 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1782 struct l_wait_info lwi = { 0 };
1784 l_wait_event(obd_zombie_waitq,
1785 !obd_zombie_impexp_check(NULL), &lwi);
1786 obd_zombie_impexp_cull();
1789 * Notify obd_zombie_barrier callers that queues
1792 wake_up(&obd_zombie_waitq);
1795 complete(&obd_zombie_stop);
1800 #else /* ! KERNEL */
1802 static atomic_t zombie_recur = ATOMIC_INIT(0);
1803 static void *obd_zombie_impexp_work_cb;
1804 static void *obd_zombie_impexp_idle_cb;
1806 int obd_zombie_impexp_kill(void *arg)
1810 if (atomic_inc_return(&zombie_recur) == 1) {
1811 obd_zombie_impexp_cull();
1814 atomic_dec(&zombie_recur);
1821 * start destroy zombie import/export thread
1823 int obd_zombie_impexp_init(void)
1826 struct task_struct *task;
1829 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1830 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1831 spin_lock_init(&obd_zombie_impexp_lock);
1832 init_completion(&obd_zombie_start);
1833 init_completion(&obd_zombie_stop);
1834 init_waitqueue_head(&obd_zombie_waitq);
1838 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1840 RETURN(PTR_ERR(task));
1842 wait_for_completion(&obd_zombie_start);
1845 obd_zombie_impexp_work_cb =
1846 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1847 &obd_zombie_impexp_kill, NULL);
1849 obd_zombie_impexp_idle_cb =
1850 liblustre_register_idle_callback("obd_zombi_impexp_check",
1851 &obd_zombie_impexp_check, NULL);
1856 * stop destroy zombie import/export thread
1858 void obd_zombie_impexp_stop(void)
1860 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1861 obd_zombie_impexp_notify();
1863 wait_for_completion(&obd_zombie_stop);
1865 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1866 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1870 /***** Kernel-userspace comm helpers *******/
1872 /* Get length of entire message, including header */
1873 int kuc_len(int payload_len)
1875 return sizeof(struct kuc_hdr) + payload_len;
1877 EXPORT_SYMBOL(kuc_len);
1879 /* Get a pointer to kuc header, given a ptr to the payload
1880 * @param p Pointer to payload area
1881 * @returns Pointer to kuc header
1883 struct kuc_hdr * kuc_ptr(void *p)
1885 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1886 LASSERT(lh->kuc_magic == KUC_MAGIC);
1889 EXPORT_SYMBOL(kuc_ptr);
1891 /* Test if payload is part of kuc message
1892 * @param p Pointer to payload area
1895 int kuc_ispayload(void *p)
1897 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1899 if (kh->kuc_magic == KUC_MAGIC)
1904 EXPORT_SYMBOL(kuc_ispayload);
1906 /* Alloc space for a message, and fill in header
1907 * @return Pointer to payload area
1909 void *kuc_alloc(int payload_len, int transport, int type)
1912 int len = kuc_len(payload_len);
1916 return ERR_PTR(-ENOMEM);
1918 lh->kuc_magic = KUC_MAGIC;
1919 lh->kuc_transport = transport;
1920 lh->kuc_msgtype = type;
1921 lh->kuc_msglen = len;
1923 return (void *)(lh + 1);
1925 EXPORT_SYMBOL(kuc_alloc);
1927 /* Takes pointer to payload area */
1928 inline void kuc_free(void *p, int payload_len)
1930 struct kuc_hdr *lh = kuc_ptr(p);
1931 OBD_FREE(lh, kuc_len(payload_len));
1933 EXPORT_SYMBOL(kuc_free);
1935 struct obd_request_slot_waiter {
1936 struct list_head orsw_entry;
1937 wait_queue_head_t orsw_waitq;
1941 static bool obd_request_slot_avail(struct client_obd *cli,
1942 struct obd_request_slot_waiter *orsw)
1946 client_obd_list_lock(&cli->cl_loi_list_lock);
1947 avail = !!list_empty(&orsw->orsw_entry);
1948 client_obd_list_unlock(&cli->cl_loi_list_lock);
1954 * For network flow control, the RPC sponsor needs to acquire a credit
1955 * before sending the RPC. The credits count for a connection is defined
1956 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1957 * the subsequent RPC sponsors need to wait until others released their
1958 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1960 int obd_get_request_slot(struct client_obd *cli)
1962 struct obd_request_slot_waiter orsw;
1963 struct l_wait_info lwi;
1966 client_obd_list_lock(&cli->cl_loi_list_lock);
1967 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1968 cli->cl_r_in_flight++;
1969 client_obd_list_unlock(&cli->cl_loi_list_lock);
1973 init_waitqueue_head(&orsw.orsw_waitq);
1974 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1975 orsw.orsw_signaled = false;
1976 client_obd_list_unlock(&cli->cl_loi_list_lock);
1978 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1979 rc = l_wait_event(orsw.orsw_waitq,
1980 obd_request_slot_avail(cli, &orsw) ||
1984 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1985 * freed but other (such as obd_put_request_slot) is using it. */
1986 client_obd_list_lock(&cli->cl_loi_list_lock);
1988 if (!orsw.orsw_signaled) {
1989 if (list_empty(&orsw.orsw_entry))
1990 cli->cl_r_in_flight--;
1992 list_del(&orsw.orsw_entry);
1996 if (orsw.orsw_signaled) {
1997 LASSERT(list_empty(&orsw.orsw_entry));
2001 client_obd_list_unlock(&cli->cl_loi_list_lock);
2005 EXPORT_SYMBOL(obd_get_request_slot);
2007 void obd_put_request_slot(struct client_obd *cli)
2009 struct obd_request_slot_waiter *orsw;
2011 client_obd_list_lock(&cli->cl_loi_list_lock);
2012 cli->cl_r_in_flight--;
2014 /* If there is free slot, wakeup the first waiter. */
2015 if (!list_empty(&cli->cl_loi_read_list) &&
2016 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2017 orsw = list_entry(cli->cl_loi_read_list.next,
2018 struct obd_request_slot_waiter, orsw_entry);
2019 list_del_init(&orsw->orsw_entry);
2020 cli->cl_r_in_flight++;
2021 wake_up(&orsw->orsw_waitq);
2023 client_obd_list_unlock(&cli->cl_loi_list_lock);
2025 EXPORT_SYMBOL(obd_put_request_slot);
2027 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2029 return cli->cl_max_rpcs_in_flight;
2031 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2033 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2035 struct obd_request_slot_waiter *orsw;
2040 if (max > OBD_MAX_RIF_MAX || max < 1)
2043 client_obd_list_lock(&cli->cl_loi_list_lock);
2044 old = cli->cl_max_rpcs_in_flight;
2045 cli->cl_max_rpcs_in_flight = max;
2048 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2049 for (i = 0; i < diff; i++) {
2050 if (list_empty(&cli->cl_loi_read_list))
2053 orsw = list_entry(cli->cl_loi_read_list.next,
2054 struct obd_request_slot_waiter, orsw_entry);
2055 list_del_init(&orsw->orsw_entry);
2056 cli->cl_r_in_flight++;
2057 wake_up(&orsw->orsw_waitq);
2059 client_obd_list_unlock(&cli->cl_loi_list_lock);
2063 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);