4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
46 #include <obd_class.h>
47 #include <lprocfs_status.h>
49 extern cfs_list_t obd_types;
50 spinlock_t obd_types_lock;
52 struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 struct kmem_cache *import_cachep;
57 cfs_list_t obd_zombie_imports;
58 cfs_list_t obd_zombie_exports;
59 spinlock_t obd_zombie_impexp_lock;
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64 const char *status, int locks);
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
70 * support functions: we could use inter-module communication, but this
71 * is more portable to other OS's
73 static struct obd_device *obd_device_alloc(void)
75 struct obd_device *obd;
77 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79 obd->obd_magic = OBD_DEVICE_MAGIC;
84 static void obd_device_free(struct obd_device *obd)
87 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89 if (obd->obd_namespace != NULL) {
90 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91 obd, obd->obd_namespace, obd->obd_force);
94 lu_ref_fini(&obd->obd_reference);
95 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 struct obd_type *class_search_type(const char *name)
101 struct obd_type *type;
103 spin_lock(&obd_types_lock);
104 cfs_list_for_each(tmp, &obd_types) {
105 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 spin_unlock(&obd_types_lock);
111 spin_unlock(&obd_types_lock);
114 EXPORT_SYMBOL(class_search_type);
116 struct obd_type *class_get_type(const char *name)
118 struct obd_type *type = class_search_type(name);
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
122 const char *modname = name;
124 if (strcmp(modname, "obdfilter") == 0)
127 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
128 modname = LUSTRE_OSP_NAME;
130 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
131 modname = LUSTRE_MDT_NAME;
133 if (!request_module("%s", modname)) {
134 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
135 type = class_search_type(name);
137 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
143 spin_lock(&type->obd_type_lock);
145 try_module_get(type->typ_dt_ops->o_owner);
146 spin_unlock(&type->obd_type_lock);
150 EXPORT_SYMBOL(class_get_type);
152 void class_put_type(struct obd_type *type)
155 spin_lock(&type->obd_type_lock);
157 module_put(type->typ_dt_ops->o_owner);
158 spin_unlock(&type->obd_type_lock);
160 EXPORT_SYMBOL(class_put_type);
162 #define CLASS_MAX_NAME 1024
164 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
165 bool enable_proc, struct lprocfs_seq_vars *module_vars,
166 #ifndef HAVE_ONLY_PROCFS_SEQ
167 struct lprocfs_vars *vars,
169 const char *name, struct lu_device_type *ldt)
171 struct obd_type *type;
176 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
178 if (class_search_type(name)) {
179 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
184 OBD_ALLOC(type, sizeof(*type));
188 OBD_ALLOC_PTR(type->typ_dt_ops);
189 OBD_ALLOC_PTR(type->typ_md_ops);
190 OBD_ALLOC(type->typ_name, strlen(name) + 1);
192 if (type->typ_dt_ops == NULL ||
193 type->typ_md_ops == NULL ||
194 type->typ_name == NULL)
197 *(type->typ_dt_ops) = *dt_ops;
198 /* md_ops is optional */
200 *(type->typ_md_ops) = *md_ops;
201 strcpy(type->typ_name, name);
202 spin_lock_init(&type->obd_type_lock);
206 #ifndef HAVE_ONLY_PROCFS_SEQ
208 type->typ_procroot = lprocfs_register(type->typ_name,
214 type->typ_procroot = lprocfs_seq_register(type->typ_name,
218 if (IS_ERR(type->typ_procroot)) {
219 rc = PTR_ERR(type->typ_procroot);
220 type->typ_procroot = NULL;
227 rc = lu_device_type_init(ldt);
232 spin_lock(&obd_types_lock);
233 cfs_list_add(&type->typ_chain, &obd_types);
234 spin_unlock(&obd_types_lock);
239 if (type->typ_name != NULL) {
241 if (type->typ_procroot != NULL) {
242 #ifndef HAVE_ONLY_PROCFS_SEQ
243 lprocfs_try_remove_proc_entry(type->typ_name,
246 remove_proc_subtree(type->typ_name, proc_lustre_root);
250 OBD_FREE(type->typ_name, strlen(name) + 1);
252 if (type->typ_md_ops != NULL)
253 OBD_FREE_PTR(type->typ_md_ops);
254 if (type->typ_dt_ops != NULL)
255 OBD_FREE_PTR(type->typ_dt_ops);
256 OBD_FREE(type, sizeof(*type));
259 EXPORT_SYMBOL(class_register_type);
261 int class_unregister_type(const char *name)
263 struct obd_type *type = class_search_type(name);
267 CERROR("unknown obd type\n");
271 if (type->typ_refcnt) {
272 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
273 /* This is a bad situation, let's make the best of it */
274 /* Remove ops, but leave the name for debugging */
275 OBD_FREE_PTR(type->typ_dt_ops);
276 OBD_FREE_PTR(type->typ_md_ops);
280 /* we do not use type->typ_procroot as for compatibility purposes
281 * other modules can share names (i.e. lod can use lov entry). so
282 * we can't reference pointer as it can get invalided when another
283 * module removes the entry */
285 if (type->typ_procroot != NULL) {
286 #ifndef HAVE_ONLY_PROCFS_SEQ
287 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
289 remove_proc_subtree(type->typ_name, proc_lustre_root);
293 if (type->typ_procsym != NULL)
294 lprocfs_remove(&type->typ_procsym);
297 lu_device_type_fini(type->typ_lu);
299 spin_lock(&obd_types_lock);
300 cfs_list_del(&type->typ_chain);
301 spin_unlock(&obd_types_lock);
302 OBD_FREE(type->typ_name, strlen(name) + 1);
303 if (type->typ_dt_ops != NULL)
304 OBD_FREE_PTR(type->typ_dt_ops);
305 if (type->typ_md_ops != NULL)
306 OBD_FREE_PTR(type->typ_md_ops);
307 OBD_FREE(type, sizeof(*type));
309 } /* class_unregister_type */
310 EXPORT_SYMBOL(class_unregister_type);
313 * Create a new obd device.
315 * Find an empty slot in ::obd_devs[], create a new obd device in it.
317 * \param[in] type_name obd device type string.
318 * \param[in] name obd device name.
320 * \retval NULL if create fails, otherwise return the obd device
323 struct obd_device *class_newdev(const char *type_name, const char *name)
325 struct obd_device *result = NULL;
326 struct obd_device *newdev;
327 struct obd_type *type = NULL;
329 int new_obd_minor = 0;
332 if (strlen(name) >= MAX_OBD_NAME) {
333 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
334 RETURN(ERR_PTR(-EINVAL));
337 type = class_get_type(type_name);
339 CERROR("OBD: unknown type: %s\n", type_name);
340 RETURN(ERR_PTR(-ENODEV));
343 newdev = obd_device_alloc();
345 GOTO(out_type, result = ERR_PTR(-ENOMEM));
347 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
349 write_lock(&obd_dev_lock);
350 for (i = 0; i < class_devno_max(); i++) {
351 struct obd_device *obd = class_num2obd(i);
353 if (obd && (strcmp(name, obd->obd_name) == 0)) {
354 CERROR("Device %s already exists at %d, won't add\n",
357 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
358 "%p obd_magic %08x != %08x\n", result,
359 result->obd_magic, OBD_DEVICE_MAGIC);
360 LASSERTF(result->obd_minor == new_obd_minor,
361 "%p obd_minor %d != %d\n", result,
362 result->obd_minor, new_obd_minor);
364 obd_devs[result->obd_minor] = NULL;
365 result->obd_name[0]='\0';
367 result = ERR_PTR(-EEXIST);
370 if (!result && !obd) {
372 result->obd_minor = i;
374 result->obd_type = type;
375 strncpy(result->obd_name, name,
376 sizeof(result->obd_name) - 1);
377 obd_devs[i] = result;
380 write_unlock(&obd_dev_lock);
382 if (result == NULL && i >= class_devno_max()) {
383 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
385 GOTO(out, result = ERR_PTR(-EOVERFLOW));
391 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
392 result->obd_name, result);
396 obd_device_free(newdev);
398 class_put_type(type);
402 void class_release_dev(struct obd_device *obd)
404 struct obd_type *obd_type = obd->obd_type;
406 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
407 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
408 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
409 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
410 LASSERT(obd_type != NULL);
412 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
413 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
415 write_lock(&obd_dev_lock);
416 obd_devs[obd->obd_minor] = NULL;
417 write_unlock(&obd_dev_lock);
418 obd_device_free(obd);
420 class_put_type(obd_type);
423 int class_name2dev(const char *name)
430 read_lock(&obd_dev_lock);
431 for (i = 0; i < class_devno_max(); i++) {
432 struct obd_device *obd = class_num2obd(i);
434 if (obd && strcmp(name, obd->obd_name) == 0) {
435 /* Make sure we finished attaching before we give
436 out any references */
437 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
438 if (obd->obd_attached) {
439 read_unlock(&obd_dev_lock);
445 read_unlock(&obd_dev_lock);
449 EXPORT_SYMBOL(class_name2dev);
451 struct obd_device *class_name2obd(const char *name)
453 int dev = class_name2dev(name);
455 if (dev < 0 || dev > class_devno_max())
457 return class_num2obd(dev);
459 EXPORT_SYMBOL(class_name2obd);
461 int class_uuid2dev(struct obd_uuid *uuid)
465 read_lock(&obd_dev_lock);
466 for (i = 0; i < class_devno_max(); i++) {
467 struct obd_device *obd = class_num2obd(i);
469 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
470 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
471 read_unlock(&obd_dev_lock);
475 read_unlock(&obd_dev_lock);
479 EXPORT_SYMBOL(class_uuid2dev);
481 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
483 int dev = class_uuid2dev(uuid);
486 return class_num2obd(dev);
488 EXPORT_SYMBOL(class_uuid2obd);
491 * Get obd device from ::obd_devs[]
493 * \param num [in] array index
495 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
496 * otherwise return the obd device there.
498 struct obd_device *class_num2obd(int num)
500 struct obd_device *obd = NULL;
502 if (num < class_devno_max()) {
507 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
508 "%p obd_magic %08x != %08x\n",
509 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
510 LASSERTF(obd->obd_minor == num,
511 "%p obd_minor %0d != %0d\n",
512 obd, obd->obd_minor, num);
517 EXPORT_SYMBOL(class_num2obd);
520 * Get obd devices count. Device in any
522 * \retval obd device count
524 int get_devices_count(void)
526 int index, max_index = class_devno_max(), dev_count = 0;
528 read_lock(&obd_dev_lock);
529 for (index = 0; index <= max_index; index++) {
530 struct obd_device *obd = class_num2obd(index);
534 read_unlock(&obd_dev_lock);
538 EXPORT_SYMBOL(get_devices_count);
540 void class_obd_list(void)
545 read_lock(&obd_dev_lock);
546 for (i = 0; i < class_devno_max(); i++) {
547 struct obd_device *obd = class_num2obd(i);
551 if (obd->obd_stopping)
553 else if (obd->obd_set_up)
555 else if (obd->obd_attached)
559 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
560 i, status, obd->obd_type->typ_name,
561 obd->obd_name, obd->obd_uuid.uuid,
562 atomic_read(&obd->obd_refcount));
564 read_unlock(&obd_dev_lock);
568 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
569 specified, then only the client with that uuid is returned,
570 otherwise any client connected to the tgt is returned. */
571 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
572 const char * typ_name,
573 struct obd_uuid *grp_uuid)
577 read_lock(&obd_dev_lock);
578 for (i = 0; i < class_devno_max(); i++) {
579 struct obd_device *obd = class_num2obd(i);
583 if ((strncmp(obd->obd_type->typ_name, typ_name,
584 strlen(typ_name)) == 0)) {
585 if (obd_uuid_equals(tgt_uuid,
586 &obd->u.cli.cl_target_uuid) &&
587 ((grp_uuid)? obd_uuid_equals(grp_uuid,
588 &obd->obd_uuid) : 1)) {
589 read_unlock(&obd_dev_lock);
594 read_unlock(&obd_dev_lock);
598 EXPORT_SYMBOL(class_find_client_obd);
600 /* Iterate the obd_device list looking devices have grp_uuid. Start
601 searching at *next, and if a device is found, the next index to look
602 at is saved in *next. If next is NULL, then the first matching device
603 will always be returned. */
604 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
610 else if (*next >= 0 && *next < class_devno_max())
615 read_lock(&obd_dev_lock);
616 for (; i < class_devno_max(); i++) {
617 struct obd_device *obd = class_num2obd(i);
621 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
624 read_unlock(&obd_dev_lock);
628 read_unlock(&obd_dev_lock);
632 EXPORT_SYMBOL(class_devices_in_group);
635 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
636 * adjust sptlrpc settings accordingly.
638 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
640 struct obd_device *obd;
644 LASSERT(namelen > 0);
646 read_lock(&obd_dev_lock);
647 for (i = 0; i < class_devno_max(); i++) {
648 obd = class_num2obd(i);
650 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
653 /* only notify mdc, osc, mdt, ost */
654 type = obd->obd_type->typ_name;
655 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
656 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
657 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
658 strcmp(type, LUSTRE_OST_NAME) != 0)
661 if (strncmp(obd->obd_name, fsname, namelen))
664 class_incref(obd, __FUNCTION__, obd);
665 read_unlock(&obd_dev_lock);
666 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
667 sizeof(KEY_SPTLRPC_CONF),
668 KEY_SPTLRPC_CONF, 0, NULL, NULL);
670 class_decref(obd, __FUNCTION__, obd);
671 read_lock(&obd_dev_lock);
673 read_unlock(&obd_dev_lock);
676 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
678 void obd_cleanup_caches(void)
681 if (obd_device_cachep) {
682 kmem_cache_destroy(obd_device_cachep);
683 obd_device_cachep = NULL;
686 kmem_cache_destroy(obdo_cachep);
690 kmem_cache_destroy(import_cachep);
691 import_cachep = NULL;
694 kmem_cache_destroy(capa_cachep);
700 int obd_init_caches(void)
705 LASSERT(obd_device_cachep == NULL);
706 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
707 sizeof(struct obd_device),
709 if (!obd_device_cachep)
710 GOTO(out, rc = -ENOMEM);
712 LASSERT(obdo_cachep == NULL);
713 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
716 GOTO(out, rc = -ENOMEM);
718 LASSERT(import_cachep == NULL);
719 import_cachep = kmem_cache_create("ll_import_cache",
720 sizeof(struct obd_import),
723 GOTO(out, rc = -ENOMEM);
725 LASSERT(capa_cachep == NULL);
726 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
729 GOTO(out, rc = -ENOMEM);
733 obd_cleanup_caches();
737 /* map connection to client */
738 struct obd_export *class_conn2export(struct lustre_handle *conn)
740 struct obd_export *export;
744 CDEBUG(D_CACHE, "looking for null handle\n");
748 if (conn->cookie == -1) { /* this means assign a new connection */
749 CDEBUG(D_CACHE, "want a new connection\n");
753 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
754 export = class_handle2object(conn->cookie, NULL);
757 EXPORT_SYMBOL(class_conn2export);
759 struct obd_device *class_exp2obd(struct obd_export *exp)
765 EXPORT_SYMBOL(class_exp2obd);
767 struct obd_device *class_conn2obd(struct lustre_handle *conn)
769 struct obd_export *export;
770 export = class_conn2export(conn);
772 struct obd_device *obd = export->exp_obd;
773 class_export_put(export);
778 EXPORT_SYMBOL(class_conn2obd);
780 struct obd_import *class_exp2cliimp(struct obd_export *exp)
782 struct obd_device *obd = exp->exp_obd;
785 return obd->u.cli.cl_import;
787 EXPORT_SYMBOL(class_exp2cliimp);
789 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
791 struct obd_device *obd = class_conn2obd(conn);
794 return obd->u.cli.cl_import;
796 EXPORT_SYMBOL(class_conn2cliimp);
798 /* Export management functions */
799 static void class_export_destroy(struct obd_export *exp)
801 struct obd_device *obd = exp->exp_obd;
804 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
805 LASSERT(obd != NULL);
807 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
808 exp->exp_client_uuid.uuid, obd->obd_name);
810 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
811 if (exp->exp_connection)
812 ptlrpc_put_connection_superhack(exp->exp_connection);
814 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
815 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
816 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
817 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
818 obd_destroy_export(exp);
819 class_decref(obd, "export", exp);
821 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
825 static void export_handle_addref(void *export)
827 class_export_get(export);
830 static struct portals_handle_ops export_handle_ops = {
831 .hop_addref = export_handle_addref,
835 struct obd_export *class_export_get(struct obd_export *exp)
837 atomic_inc(&exp->exp_refcount);
838 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
839 atomic_read(&exp->exp_refcount));
842 EXPORT_SYMBOL(class_export_get);
844 void class_export_put(struct obd_export *exp)
846 LASSERT(exp != NULL);
847 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
848 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
849 atomic_read(&exp->exp_refcount) - 1);
851 if (atomic_dec_and_test(&exp->exp_refcount)) {
852 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
853 CDEBUG(D_IOCTL, "final put %p/%s\n",
854 exp, exp->exp_client_uuid.uuid);
856 /* release nid stat refererence */
857 lprocfs_exp_cleanup(exp);
859 obd_zombie_export_add(exp);
862 EXPORT_SYMBOL(class_export_put);
864 /* Creates a new export, adds it to the hash table, and returns a
865 * pointer to it. The refcount is 2: one for the hash reference, and
866 * one for the pointer returned by this function. */
867 struct obd_export *class_new_export(struct obd_device *obd,
868 struct obd_uuid *cluuid)
870 struct obd_export *export;
871 cfs_hash_t *hash = NULL;
875 OBD_ALLOC_PTR(export);
877 return ERR_PTR(-ENOMEM);
879 export->exp_conn_cnt = 0;
880 export->exp_lock_hash = NULL;
881 export->exp_flock_hash = NULL;
882 atomic_set(&export->exp_refcount, 2);
883 atomic_set(&export->exp_rpc_count, 0);
884 atomic_set(&export->exp_cb_count, 0);
885 atomic_set(&export->exp_locks_count, 0);
886 #if LUSTRE_TRACKS_LOCK_EXP_REFS
887 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
888 spin_lock_init(&export->exp_locks_list_guard);
890 atomic_set(&export->exp_replay_count, 0);
891 export->exp_obd = obd;
892 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
893 spin_lock_init(&export->exp_uncommitted_replies_lock);
894 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
895 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
896 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
897 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
898 CFS_INIT_LIST_HEAD(&export->exp_reg_rpcs);
899 class_handle_hash(&export->exp_handle, &export_handle_ops);
900 export->exp_last_request_time = cfs_time_current_sec();
901 spin_lock_init(&export->exp_lock);
902 spin_lock_init(&export->exp_rpc_lock);
903 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
904 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
905 spin_lock_init(&export->exp_bl_list_lock);
906 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
908 export->exp_sp_peer = LUSTRE_SP_ANY;
909 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
910 export->exp_client_uuid = *cluuid;
911 obd_init_export(export);
913 spin_lock(&obd->obd_dev_lock);
914 /* shouldn't happen, but might race */
915 if (obd->obd_stopping)
916 GOTO(exit_unlock, rc = -ENODEV);
918 hash = cfs_hash_getref(obd->obd_uuid_hash);
920 GOTO(exit_unlock, rc = -ENODEV);
921 spin_unlock(&obd->obd_dev_lock);
923 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
924 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
926 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
927 obd->obd_name, cluuid->uuid, rc);
928 GOTO(exit_err, rc = -EALREADY);
932 spin_lock(&obd->obd_dev_lock);
933 if (obd->obd_stopping) {
934 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
935 GOTO(exit_unlock, rc = -ENODEV);
938 class_incref(obd, "export", export);
939 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
940 cfs_list_add_tail(&export->exp_obd_chain_timed,
941 &export->exp_obd->obd_exports_timed);
942 export->exp_obd->obd_num_exports++;
943 spin_unlock(&obd->obd_dev_lock);
944 cfs_hash_putref(hash);
948 spin_unlock(&obd->obd_dev_lock);
951 cfs_hash_putref(hash);
952 class_handle_unhash(&export->exp_handle);
953 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
954 obd_destroy_export(export);
955 OBD_FREE_PTR(export);
958 EXPORT_SYMBOL(class_new_export);
960 void class_unlink_export(struct obd_export *exp)
962 class_handle_unhash(&exp->exp_handle);
964 spin_lock(&exp->exp_obd->obd_dev_lock);
965 /* delete an uuid-export hashitem from hashtables */
966 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
967 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
968 &exp->exp_client_uuid,
969 &exp->exp_uuid_hash);
971 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
972 cfs_list_del_init(&exp->exp_obd_chain_timed);
973 exp->exp_obd->obd_num_exports--;
974 spin_unlock(&exp->exp_obd->obd_dev_lock);
975 class_export_put(exp);
977 EXPORT_SYMBOL(class_unlink_export);
979 /* Import management functions */
980 void class_import_destroy(struct obd_import *imp)
984 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
985 imp->imp_obd->obd_name);
987 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
989 ptlrpc_put_connection_superhack(imp->imp_connection);
991 while (!cfs_list_empty(&imp->imp_conn_list)) {
992 struct obd_import_conn *imp_conn;
994 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
995 struct obd_import_conn, oic_item);
996 cfs_list_del_init(&imp_conn->oic_item);
997 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
998 OBD_FREE(imp_conn, sizeof(*imp_conn));
1001 LASSERT(imp->imp_sec == NULL);
1002 class_decref(imp->imp_obd, "import", imp);
1003 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1007 static void import_handle_addref(void *import)
1009 class_import_get(import);
1012 static struct portals_handle_ops import_handle_ops = {
1013 .hop_addref = import_handle_addref,
1017 struct obd_import *class_import_get(struct obd_import *import)
1019 atomic_inc(&import->imp_refcount);
1020 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1021 atomic_read(&import->imp_refcount),
1022 import->imp_obd->obd_name);
1025 EXPORT_SYMBOL(class_import_get);
1027 void class_import_put(struct obd_import *imp)
1031 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1032 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1034 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1035 atomic_read(&imp->imp_refcount) - 1,
1036 imp->imp_obd->obd_name);
1038 if (atomic_dec_and_test(&imp->imp_refcount)) {
1039 CDEBUG(D_INFO, "final put import %p\n", imp);
1040 obd_zombie_import_add(imp);
1043 /* catch possible import put race */
1044 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1047 EXPORT_SYMBOL(class_import_put);
1049 static void init_imp_at(struct imp_at *at) {
1051 at_init(&at->iat_net_latency, 0, 0);
1052 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1053 /* max service estimates are tracked on the server side, so
1054 don't use the AT history here, just use the last reported
1055 val. (But keep hist for proc histogram, worst_ever) */
1056 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1061 struct obd_import *class_new_import(struct obd_device *obd)
1063 struct obd_import *imp;
1065 OBD_ALLOC(imp, sizeof(*imp));
1069 CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1070 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1071 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1072 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1073 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1074 CFS_INIT_LIST_HEAD(&imp->imp_committed_list);
1075 imp->imp_replay_cursor = &imp->imp_committed_list;
1076 spin_lock_init(&imp->imp_lock);
1077 imp->imp_last_success_conn = 0;
1078 imp->imp_state = LUSTRE_IMP_NEW;
1079 imp->imp_obd = class_incref(obd, "import", imp);
1080 mutex_init(&imp->imp_sec_mutex);
1081 init_waitqueue_head(&imp->imp_recovery_waitq);
1083 atomic_set(&imp->imp_refcount, 2);
1084 atomic_set(&imp->imp_unregistering, 0);
1085 atomic_set(&imp->imp_inflight, 0);
1086 atomic_set(&imp->imp_replay_inflight, 0);
1087 atomic_set(&imp->imp_inval_count, 0);
1088 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1089 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1090 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1091 init_imp_at(&imp->imp_at);
1093 /* the default magic is V2, will be used in connect RPC, and
1094 * then adjusted according to the flags in request/reply. */
1095 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1099 EXPORT_SYMBOL(class_new_import);
1101 void class_destroy_import(struct obd_import *import)
1103 LASSERT(import != NULL);
1104 LASSERT(import != LP_POISON);
1106 class_handle_unhash(&import->imp_handle);
1108 spin_lock(&import->imp_lock);
1109 import->imp_generation++;
1110 spin_unlock(&import->imp_lock);
1111 class_import_put(import);
1113 EXPORT_SYMBOL(class_destroy_import);
1115 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1117 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1119 spin_lock(&exp->exp_locks_list_guard);
1121 LASSERT(lock->l_exp_refs_nr >= 0);
1123 if (lock->l_exp_refs_target != NULL &&
1124 lock->l_exp_refs_target != exp) {
1125 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1126 exp, lock, lock->l_exp_refs_target);
1128 if ((lock->l_exp_refs_nr ++) == 0) {
1129 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1130 lock->l_exp_refs_target = exp;
1132 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1133 lock, exp, lock->l_exp_refs_nr);
1134 spin_unlock(&exp->exp_locks_list_guard);
1136 EXPORT_SYMBOL(__class_export_add_lock_ref);
1138 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1140 spin_lock(&exp->exp_locks_list_guard);
1141 LASSERT(lock->l_exp_refs_nr > 0);
1142 if (lock->l_exp_refs_target != exp) {
1143 LCONSOLE_WARN("lock %p, "
1144 "mismatching export pointers: %p, %p\n",
1145 lock, lock->l_exp_refs_target, exp);
1147 if (-- lock->l_exp_refs_nr == 0) {
1148 cfs_list_del_init(&lock->l_exp_refs_link);
1149 lock->l_exp_refs_target = NULL;
1151 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1152 lock, exp, lock->l_exp_refs_nr);
1153 spin_unlock(&exp->exp_locks_list_guard);
1155 EXPORT_SYMBOL(__class_export_del_lock_ref);
1158 /* A connection defines an export context in which preallocation can
1159 be managed. This releases the export pointer reference, and returns
1160 the export handle, so the export refcount is 1 when this function
1162 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1163 struct obd_uuid *cluuid)
1165 struct obd_export *export;
1166 LASSERT(conn != NULL);
1167 LASSERT(obd != NULL);
1168 LASSERT(cluuid != NULL);
1171 export = class_new_export(obd, cluuid);
1173 RETURN(PTR_ERR(export));
1175 conn->cookie = export->exp_handle.h_cookie;
1176 class_export_put(export);
1178 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1179 cluuid->uuid, conn->cookie);
1182 EXPORT_SYMBOL(class_connect);
1184 /* if export is involved in recovery then clean up related things */
1185 void class_export_recovery_cleanup(struct obd_export *exp)
1187 struct obd_device *obd = exp->exp_obd;
1189 spin_lock(&obd->obd_recovery_task_lock);
1190 if (obd->obd_recovering) {
1191 if (exp->exp_in_recovery) {
1192 spin_lock(&exp->exp_lock);
1193 exp->exp_in_recovery = 0;
1194 spin_unlock(&exp->exp_lock);
1195 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1196 atomic_dec(&obd->obd_connected_clients);
1199 /* if called during recovery then should update
1200 * obd_stale_clients counter,
1201 * lightweight exports are not counted */
1202 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1203 exp->exp_obd->obd_stale_clients++;
1205 spin_unlock(&obd->obd_recovery_task_lock);
1207 spin_lock(&exp->exp_lock);
1208 /** Cleanup req replay fields */
1209 if (exp->exp_req_replay_needed) {
1210 exp->exp_req_replay_needed = 0;
1212 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1213 atomic_dec(&obd->obd_req_replay_clients);
1216 /** Cleanup lock replay data */
1217 if (exp->exp_lock_replay_needed) {
1218 exp->exp_lock_replay_needed = 0;
1220 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1221 atomic_dec(&obd->obd_lock_replay_clients);
1223 spin_unlock(&exp->exp_lock);
1226 /* This function removes 1-3 references from the export:
1227 * 1 - for export pointer passed
1228 * and if disconnect really need
1229 * 2 - removing from hash
1230 * 3 - in client_unlink_export
1231 * The export pointer passed to this function can destroyed */
1232 int class_disconnect(struct obd_export *export)
1234 int already_disconnected;
1237 if (export == NULL) {
1238 CWARN("attempting to free NULL export %p\n", export);
1242 spin_lock(&export->exp_lock);
1243 already_disconnected = export->exp_disconnected;
1244 export->exp_disconnected = 1;
1245 spin_unlock(&export->exp_lock);
1247 /* class_cleanup(), abort_recovery(), and class_fail_export()
1248 * all end up in here, and if any of them race we shouldn't
1249 * call extra class_export_puts(). */
1250 if (already_disconnected) {
1251 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1252 GOTO(no_disconn, already_disconnected);
1255 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1256 export->exp_handle.h_cookie);
1258 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1259 cfs_hash_del(export->exp_obd->obd_nid_hash,
1260 &export->exp_connection->c_peer.nid,
1261 &export->exp_nid_hash);
1263 class_export_recovery_cleanup(export);
1264 class_unlink_export(export);
1266 class_export_put(export);
1269 EXPORT_SYMBOL(class_disconnect);
1271 /* Return non-zero for a fully connected export */
1272 int class_connected_export(struct obd_export *exp)
1277 spin_lock(&exp->exp_lock);
1278 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1279 spin_unlock(&exp->exp_lock);
1283 EXPORT_SYMBOL(class_connected_export);
1285 static void class_disconnect_export_list(cfs_list_t *list,
1286 enum obd_option flags)
1289 struct obd_export *exp;
1292 /* It's possible that an export may disconnect itself, but
1293 * nothing else will be added to this list. */
1294 while (!cfs_list_empty(list)) {
1295 exp = cfs_list_entry(list->next, struct obd_export,
1297 /* need for safe call CDEBUG after obd_disconnect */
1298 class_export_get(exp);
1300 spin_lock(&exp->exp_lock);
1301 exp->exp_flags = flags;
1302 spin_unlock(&exp->exp_lock);
1304 if (obd_uuid_equals(&exp->exp_client_uuid,
1305 &exp->exp_obd->obd_uuid)) {
1307 "exp %p export uuid == obd uuid, don't discon\n",
1309 /* Need to delete this now so we don't end up pointing
1310 * to work_list later when this export is cleaned up. */
1311 cfs_list_del_init(&exp->exp_obd_chain);
1312 class_export_put(exp);
1316 class_export_get(exp);
1317 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1318 "last request at "CFS_TIME_T"\n",
1319 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1320 exp, exp->exp_last_request_time);
1321 /* release one export reference anyway */
1322 rc = obd_disconnect(exp);
1324 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1325 obd_export_nid2str(exp), exp, rc);
1326 class_export_put(exp);
1331 void class_disconnect_exports(struct obd_device *obd)
1333 cfs_list_t work_list;
1336 /* Move all of the exports from obd_exports to a work list, en masse. */
1337 CFS_INIT_LIST_HEAD(&work_list);
1338 spin_lock(&obd->obd_dev_lock);
1339 cfs_list_splice_init(&obd->obd_exports, &work_list);
1340 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1341 spin_unlock(&obd->obd_dev_lock);
1343 if (!cfs_list_empty(&work_list)) {
1344 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1345 "disconnecting them\n", obd->obd_minor, obd);
1346 class_disconnect_export_list(&work_list,
1347 exp_flags_from_obd(obd));
1349 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1350 obd->obd_minor, obd);
1353 EXPORT_SYMBOL(class_disconnect_exports);
1355 /* Remove exports that have not completed recovery.
1357 void class_disconnect_stale_exports(struct obd_device *obd,
1358 int (*test_export)(struct obd_export *))
1360 cfs_list_t work_list;
1361 struct obd_export *exp, *n;
1365 CFS_INIT_LIST_HEAD(&work_list);
1366 spin_lock(&obd->obd_dev_lock);
1367 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1369 /* don't count self-export as client */
1370 if (obd_uuid_equals(&exp->exp_client_uuid,
1371 &exp->exp_obd->obd_uuid))
1374 /* don't evict clients which have no slot in last_rcvd
1375 * (e.g. lightweight connection) */
1376 if (exp->exp_target_data.ted_lr_idx == -1)
1379 spin_lock(&exp->exp_lock);
1380 if (exp->exp_failed || test_export(exp)) {
1381 spin_unlock(&exp->exp_lock);
1384 exp->exp_failed = 1;
1385 spin_unlock(&exp->exp_lock);
1387 cfs_list_move(&exp->exp_obd_chain, &work_list);
1389 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1390 obd->obd_name, exp->exp_client_uuid.uuid,
1391 exp->exp_connection == NULL ? "<unknown>" :
1392 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1393 print_export_data(exp, "EVICTING", 0);
1395 spin_unlock(&obd->obd_dev_lock);
1398 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1399 obd->obd_name, evicted);
1401 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1402 OBD_OPT_ABORT_RECOV);
1405 EXPORT_SYMBOL(class_disconnect_stale_exports);
1407 void class_fail_export(struct obd_export *exp)
1409 int rc, already_failed;
1411 spin_lock(&exp->exp_lock);
1412 already_failed = exp->exp_failed;
1413 exp->exp_failed = 1;
1414 spin_unlock(&exp->exp_lock);
1416 if (already_failed) {
1417 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1418 exp, exp->exp_client_uuid.uuid);
1422 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1423 exp, exp->exp_client_uuid.uuid);
1425 if (obd_dump_on_timeout)
1426 libcfs_debug_dumplog();
1428 /* need for safe call CDEBUG after obd_disconnect */
1429 class_export_get(exp);
1431 /* Most callers into obd_disconnect are removing their own reference
1432 * (request, for example) in addition to the one from the hash table.
1433 * We don't have such a reference here, so make one. */
1434 class_export_get(exp);
1435 rc = obd_disconnect(exp);
1437 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1439 CDEBUG(D_HA, "disconnected export %p/%s\n",
1440 exp, exp->exp_client_uuid.uuid);
1441 class_export_put(exp);
1443 EXPORT_SYMBOL(class_fail_export);
1445 char *obd_export_nid2str(struct obd_export *exp)
1447 if (exp->exp_connection != NULL)
1448 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1452 EXPORT_SYMBOL(obd_export_nid2str);
1454 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1456 cfs_hash_t *nid_hash;
1457 struct obd_export *doomed_exp = NULL;
1458 int exports_evicted = 0;
1460 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1462 spin_lock(&obd->obd_dev_lock);
1463 /* umount has run already, so evict thread should leave
1464 * its task to umount thread now */
1465 if (obd->obd_stopping) {
1466 spin_unlock(&obd->obd_dev_lock);
1467 return exports_evicted;
1469 nid_hash = obd->obd_nid_hash;
1470 cfs_hash_getref(nid_hash);
1471 spin_unlock(&obd->obd_dev_lock);
1474 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1475 if (doomed_exp == NULL)
1478 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1479 "nid %s found, wanted nid %s, requested nid %s\n",
1480 obd_export_nid2str(doomed_exp),
1481 libcfs_nid2str(nid_key), nid);
1482 LASSERTF(doomed_exp != obd->obd_self_export,
1483 "self-export is hashed by NID?\n");
1485 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1486 "request\n", obd->obd_name,
1487 obd_uuid2str(&doomed_exp->exp_client_uuid),
1488 obd_export_nid2str(doomed_exp));
1489 class_fail_export(doomed_exp);
1490 class_export_put(doomed_exp);
1493 cfs_hash_putref(nid_hash);
1495 if (!exports_evicted)
1496 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1497 obd->obd_name, nid);
1498 return exports_evicted;
1500 EXPORT_SYMBOL(obd_export_evict_by_nid);
1502 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1504 cfs_hash_t *uuid_hash;
1505 struct obd_export *doomed_exp = NULL;
1506 struct obd_uuid doomed_uuid;
1507 int exports_evicted = 0;
1509 spin_lock(&obd->obd_dev_lock);
1510 if (obd->obd_stopping) {
1511 spin_unlock(&obd->obd_dev_lock);
1512 return exports_evicted;
1514 uuid_hash = obd->obd_uuid_hash;
1515 cfs_hash_getref(uuid_hash);
1516 spin_unlock(&obd->obd_dev_lock);
1518 obd_str2uuid(&doomed_uuid, uuid);
1519 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1520 CERROR("%s: can't evict myself\n", obd->obd_name);
1521 cfs_hash_putref(uuid_hash);
1522 return exports_evicted;
1525 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1527 if (doomed_exp == NULL) {
1528 CERROR("%s: can't disconnect %s: no exports found\n",
1529 obd->obd_name, uuid);
1531 CWARN("%s: evicting %s at adminstrative request\n",
1532 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1533 class_fail_export(doomed_exp);
1534 class_export_put(doomed_exp);
1537 cfs_hash_putref(uuid_hash);
1539 return exports_evicted;
1541 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1543 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1544 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1545 EXPORT_SYMBOL(class_export_dump_hook);
1548 static void print_export_data(struct obd_export *exp, const char *status,
1551 struct ptlrpc_reply_state *rs;
1552 struct ptlrpc_reply_state *first_reply = NULL;
1555 spin_lock(&exp->exp_lock);
1556 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1562 spin_unlock(&exp->exp_lock);
1564 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1565 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1566 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1567 atomic_read(&exp->exp_rpc_count),
1568 atomic_read(&exp->exp_cb_count),
1569 atomic_read(&exp->exp_locks_count),
1570 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1571 nreplies, first_reply, nreplies > 3 ? "..." : "",
1572 exp->exp_last_committed);
1573 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1574 if (locks && class_export_dump_hook != NULL)
1575 class_export_dump_hook(exp);
1579 void dump_exports(struct obd_device *obd, int locks)
1581 struct obd_export *exp;
1583 spin_lock(&obd->obd_dev_lock);
1584 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1585 print_export_data(exp, "ACTIVE", locks);
1586 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1587 print_export_data(exp, "UNLINKED", locks);
1588 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1589 print_export_data(exp, "DELAYED", locks);
1590 spin_unlock(&obd->obd_dev_lock);
1591 spin_lock(&obd_zombie_impexp_lock);
1592 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1593 print_export_data(exp, "ZOMBIE", locks);
1594 spin_unlock(&obd_zombie_impexp_lock);
1596 EXPORT_SYMBOL(dump_exports);
1598 void obd_exports_barrier(struct obd_device *obd)
1601 LASSERT(cfs_list_empty(&obd->obd_exports));
1602 spin_lock(&obd->obd_dev_lock);
1603 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1604 spin_unlock(&obd->obd_dev_lock);
1605 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1606 cfs_time_seconds(waited));
1607 if (waited > 5 && IS_PO2(waited)) {
1608 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1609 "more than %d seconds. "
1610 "The obd refcount = %d. Is it stuck?\n",
1611 obd->obd_name, waited,
1612 atomic_read(&obd->obd_refcount));
1613 dump_exports(obd, 1);
1616 spin_lock(&obd->obd_dev_lock);
1618 spin_unlock(&obd->obd_dev_lock);
1620 EXPORT_SYMBOL(obd_exports_barrier);
1622 /* Total amount of zombies to be destroyed */
1623 static int zombies_count = 0;
1626 * kill zombie imports and exports
1628 void obd_zombie_impexp_cull(void)
1630 struct obd_import *import;
1631 struct obd_export *export;
1635 spin_lock(&obd_zombie_impexp_lock);
1638 if (!cfs_list_empty(&obd_zombie_imports)) {
1639 import = cfs_list_entry(obd_zombie_imports.next,
1642 cfs_list_del_init(&import->imp_zombie_chain);
1646 if (!cfs_list_empty(&obd_zombie_exports)) {
1647 export = cfs_list_entry(obd_zombie_exports.next,
1650 cfs_list_del_init(&export->exp_obd_chain);
1653 spin_unlock(&obd_zombie_impexp_lock);
1655 if (import != NULL) {
1656 class_import_destroy(import);
1657 spin_lock(&obd_zombie_impexp_lock);
1659 spin_unlock(&obd_zombie_impexp_lock);
1662 if (export != NULL) {
1663 class_export_destroy(export);
1664 spin_lock(&obd_zombie_impexp_lock);
1666 spin_unlock(&obd_zombie_impexp_lock);
1670 } while (import != NULL || export != NULL);
1674 static struct completion obd_zombie_start;
1675 static struct completion obd_zombie_stop;
1676 static unsigned long obd_zombie_flags;
1677 static wait_queue_head_t obd_zombie_waitq;
1678 static pid_t obd_zombie_pid;
1681 OBD_ZOMBIE_STOP = 0x0001,
1685 * check for work for kill zombie import/export thread.
1687 static int obd_zombie_impexp_check(void *arg)
1691 spin_lock(&obd_zombie_impexp_lock);
1692 rc = (zombies_count == 0) &&
1693 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1694 spin_unlock(&obd_zombie_impexp_lock);
1700 * Add export to the obd_zombe thread and notify it.
1702 static void obd_zombie_export_add(struct obd_export *exp) {
1703 spin_lock(&exp->exp_obd->obd_dev_lock);
1704 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1705 cfs_list_del_init(&exp->exp_obd_chain);
1706 spin_unlock(&exp->exp_obd->obd_dev_lock);
1707 spin_lock(&obd_zombie_impexp_lock);
1709 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1710 spin_unlock(&obd_zombie_impexp_lock);
1712 obd_zombie_impexp_notify();
1716 * Add import to the obd_zombe thread and notify it.
1718 static void obd_zombie_import_add(struct obd_import *imp) {
1719 LASSERT(imp->imp_sec == NULL);
1720 LASSERT(imp->imp_rq_pool == NULL);
1721 spin_lock(&obd_zombie_impexp_lock);
1722 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1724 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1725 spin_unlock(&obd_zombie_impexp_lock);
1727 obd_zombie_impexp_notify();
1731 * notify import/export destroy thread about new zombie.
1733 static void obd_zombie_impexp_notify(void)
1736 * Make sure obd_zomebie_impexp_thread get this notification.
1737 * It is possible this signal only get by obd_zombie_barrier, and
1738 * barrier gulps this notification and sleeps away and hangs ensues
1740 wake_up_all(&obd_zombie_waitq);
1744 * check whether obd_zombie is idle
1746 static int obd_zombie_is_idle(void)
1750 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1751 spin_lock(&obd_zombie_impexp_lock);
1752 rc = (zombies_count == 0);
1753 spin_unlock(&obd_zombie_impexp_lock);
1758 * wait when obd_zombie import/export queues become empty
1760 void obd_zombie_barrier(void)
1762 struct l_wait_info lwi = { 0 };
1764 if (obd_zombie_pid == current_pid())
1765 /* don't wait for myself */
1767 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1769 EXPORT_SYMBOL(obd_zombie_barrier);
1774 * destroy zombie export/import thread.
1776 static int obd_zombie_impexp_thread(void *unused)
1778 unshare_fs_struct();
1779 complete(&obd_zombie_start);
1781 obd_zombie_pid = current_pid();
1783 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1784 struct l_wait_info lwi = { 0 };
1786 l_wait_event(obd_zombie_waitq,
1787 !obd_zombie_impexp_check(NULL), &lwi);
1788 obd_zombie_impexp_cull();
1791 * Notify obd_zombie_barrier callers that queues
1794 wake_up(&obd_zombie_waitq);
1797 complete(&obd_zombie_stop);
1802 #else /* ! KERNEL */
1804 static atomic_t zombie_recur = ATOMIC_INIT(0);
1805 static void *obd_zombie_impexp_work_cb;
1806 static void *obd_zombie_impexp_idle_cb;
1808 int obd_zombie_impexp_kill(void *arg)
1812 if (atomic_inc_return(&zombie_recur) == 1) {
1813 obd_zombie_impexp_cull();
1816 atomic_dec(&zombie_recur);
1823 * start destroy zombie import/export thread
1825 int obd_zombie_impexp_init(void)
1828 struct task_struct *task;
1831 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1832 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1833 spin_lock_init(&obd_zombie_impexp_lock);
1834 init_completion(&obd_zombie_start);
1835 init_completion(&obd_zombie_stop);
1836 init_waitqueue_head(&obd_zombie_waitq);
1840 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1842 RETURN(PTR_ERR(task));
1844 wait_for_completion(&obd_zombie_start);
1847 obd_zombie_impexp_work_cb =
1848 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1849 &obd_zombie_impexp_kill, NULL);
1851 obd_zombie_impexp_idle_cb =
1852 liblustre_register_idle_callback("obd_zombi_impexp_check",
1853 &obd_zombie_impexp_check, NULL);
1858 * stop destroy zombie import/export thread
1860 void obd_zombie_impexp_stop(void)
1862 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1863 obd_zombie_impexp_notify();
1865 wait_for_completion(&obd_zombie_stop);
1867 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1868 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1872 /***** Kernel-userspace comm helpers *******/
1874 /* Get length of entire message, including header */
1875 int kuc_len(int payload_len)
1877 return sizeof(struct kuc_hdr) + payload_len;
1879 EXPORT_SYMBOL(kuc_len);
1881 /* Get a pointer to kuc header, given a ptr to the payload
1882 * @param p Pointer to payload area
1883 * @returns Pointer to kuc header
1885 struct kuc_hdr * kuc_ptr(void *p)
1887 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1888 LASSERT(lh->kuc_magic == KUC_MAGIC);
1891 EXPORT_SYMBOL(kuc_ptr);
1893 /* Test if payload is part of kuc message
1894 * @param p Pointer to payload area
1897 int kuc_ispayload(void *p)
1899 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1901 if (kh->kuc_magic == KUC_MAGIC)
1906 EXPORT_SYMBOL(kuc_ispayload);
1908 /* Alloc space for a message, and fill in header
1909 * @return Pointer to payload area
1911 void *kuc_alloc(int payload_len, int transport, int type)
1914 int len = kuc_len(payload_len);
1918 return ERR_PTR(-ENOMEM);
1920 lh->kuc_magic = KUC_MAGIC;
1921 lh->kuc_transport = transport;
1922 lh->kuc_msgtype = type;
1923 lh->kuc_msglen = len;
1925 return (void *)(lh + 1);
1927 EXPORT_SYMBOL(kuc_alloc);
1929 /* Takes pointer to payload area */
1930 inline void kuc_free(void *p, int payload_len)
1932 struct kuc_hdr *lh = kuc_ptr(p);
1933 OBD_FREE(lh, kuc_len(payload_len));
1935 EXPORT_SYMBOL(kuc_free);
1937 struct obd_request_slot_waiter {
1938 struct list_head orsw_entry;
1939 wait_queue_head_t orsw_waitq;
1943 static bool obd_request_slot_avail(struct client_obd *cli,
1944 struct obd_request_slot_waiter *orsw)
1948 client_obd_list_lock(&cli->cl_loi_list_lock);
1949 avail = !!list_empty(&orsw->orsw_entry);
1950 client_obd_list_unlock(&cli->cl_loi_list_lock);
1956 * For network flow control, the RPC sponsor needs to acquire a credit
1957 * before sending the RPC. The credits count for a connection is defined
1958 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1959 * the subsequent RPC sponsors need to wait until others released their
1960 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1962 int obd_get_request_slot(struct client_obd *cli)
1964 struct obd_request_slot_waiter orsw;
1965 struct l_wait_info lwi;
1968 client_obd_list_lock(&cli->cl_loi_list_lock);
1969 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1970 cli->cl_r_in_flight++;
1971 client_obd_list_unlock(&cli->cl_loi_list_lock);
1975 init_waitqueue_head(&orsw.orsw_waitq);
1976 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1977 orsw.orsw_signaled = false;
1978 client_obd_list_unlock(&cli->cl_loi_list_lock);
1980 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1981 rc = l_wait_event(orsw.orsw_waitq,
1982 obd_request_slot_avail(cli, &orsw) ||
1986 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1987 * freed but other (such as obd_put_request_slot) is using it. */
1988 client_obd_list_lock(&cli->cl_loi_list_lock);
1990 if (!orsw.orsw_signaled) {
1991 if (list_empty(&orsw.orsw_entry))
1992 cli->cl_r_in_flight--;
1994 list_del(&orsw.orsw_entry);
1998 if (orsw.orsw_signaled) {
1999 LASSERT(list_empty(&orsw.orsw_entry));
2003 client_obd_list_unlock(&cli->cl_loi_list_lock);
2007 EXPORT_SYMBOL(obd_get_request_slot);
2009 void obd_put_request_slot(struct client_obd *cli)
2011 struct obd_request_slot_waiter *orsw;
2013 client_obd_list_lock(&cli->cl_loi_list_lock);
2014 cli->cl_r_in_flight--;
2016 /* If there is free slot, wakeup the first waiter. */
2017 if (!list_empty(&cli->cl_loi_read_list) &&
2018 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2019 orsw = list_entry(cli->cl_loi_read_list.next,
2020 struct obd_request_slot_waiter, orsw_entry);
2021 list_del_init(&orsw->orsw_entry);
2022 cli->cl_r_in_flight++;
2023 wake_up(&orsw->orsw_waitq);
2025 client_obd_list_unlock(&cli->cl_loi_list_lock);
2027 EXPORT_SYMBOL(obd_put_request_slot);
2029 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2031 return cli->cl_max_rpcs_in_flight;
2033 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2035 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2037 struct obd_request_slot_waiter *orsw;
2042 if (max > OBD_MAX_RIF_MAX || max < 1)
2045 client_obd_list_lock(&cli->cl_loi_list_lock);
2046 old = cli->cl_max_rpcs_in_flight;
2047 cli->cl_max_rpcs_in_flight = max;
2050 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2051 for (i = 0; i < diff; i++) {
2052 if (list_empty(&cli->cl_loi_read_list))
2055 orsw = list_entry(cli->cl_loi_read_list.next,
2056 struct obd_request_slot_waiter, orsw_entry);
2057 list_del_init(&orsw->orsw_entry);
2058 cli->cl_r_in_flight++;
2059 wake_up(&orsw->orsw_waitq);
2061 client_obd_list_unlock(&cli->cl_loi_list_lock);
2065 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);