4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
46 #include <obd_class.h>
47 #include <lprocfs_status.h>
49 extern cfs_list_t obd_types;
50 spinlock_t obd_types_lock;
52 struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 struct kmem_cache *import_cachep;
57 cfs_list_t obd_zombie_imports;
58 cfs_list_t obd_zombie_exports;
59 spinlock_t obd_zombie_impexp_lock;
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64 const char *status, int locks);
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
70 * support functions: we could use inter-module communication, but this
71 * is more portable to other OS's
73 static struct obd_device *obd_device_alloc(void)
75 struct obd_device *obd;
77 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79 obd->obd_magic = OBD_DEVICE_MAGIC;
84 static void obd_device_free(struct obd_device *obd)
87 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89 if (obd->obd_namespace != NULL) {
90 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91 obd, obd->obd_namespace, obd->obd_force);
94 lu_ref_fini(&obd->obd_reference);
95 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 struct obd_type *class_search_type(const char *name)
101 struct obd_type *type;
103 spin_lock(&obd_types_lock);
104 cfs_list_for_each(tmp, &obd_types) {
105 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 spin_unlock(&obd_types_lock);
111 spin_unlock(&obd_types_lock);
114 EXPORT_SYMBOL(class_search_type);
116 struct obd_type *class_get_type(const char *name)
118 struct obd_type *type = class_search_type(name);
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
122 const char *modname = name;
124 if (strcmp(modname, "obdfilter") == 0)
127 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
128 modname = LUSTRE_OSP_NAME;
130 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
131 modname = LUSTRE_MDT_NAME;
133 if (!request_module("%s", modname)) {
134 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
135 type = class_search_type(name);
137 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
143 spin_lock(&type->obd_type_lock);
145 try_module_get(type->typ_dt_ops->o_owner);
146 spin_unlock(&type->obd_type_lock);
150 EXPORT_SYMBOL(class_get_type);
152 void class_put_type(struct obd_type *type)
155 spin_lock(&type->obd_type_lock);
157 module_put(type->typ_dt_ops->o_owner);
158 spin_unlock(&type->obd_type_lock);
160 EXPORT_SYMBOL(class_put_type);
162 #define CLASS_MAX_NAME 1024
164 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
165 struct lprocfs_seq_vars *module_vars,
166 #ifndef HAVE_ONLY_PROCFS_SEQ
167 struct lprocfs_vars *vars,
169 const char *name, struct lu_device_type *ldt)
171 struct obd_type *type;
176 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
178 if (class_search_type(name)) {
179 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
184 OBD_ALLOC(type, sizeof(*type));
188 OBD_ALLOC_PTR(type->typ_dt_ops);
189 OBD_ALLOC_PTR(type->typ_md_ops);
190 OBD_ALLOC(type->typ_name, strlen(name) + 1);
192 if (type->typ_dt_ops == NULL ||
193 type->typ_md_ops == NULL ||
194 type->typ_name == NULL)
197 *(type->typ_dt_ops) = *dt_ops;
198 /* md_ops is optional */
200 *(type->typ_md_ops) = *md_ops;
201 strcpy(type->typ_name, name);
202 spin_lock_init(&type->obd_type_lock);
205 #ifndef HAVE_ONLY_PROCFS_SEQ
207 type->typ_procroot = lprocfs_register(type->typ_name,
213 type->typ_procroot = lprocfs_seq_register(type->typ_name,
217 if (IS_ERR(type->typ_procroot)) {
218 rc = PTR_ERR(type->typ_procroot);
219 type->typ_procroot = NULL;
225 rc = lu_device_type_init(ldt);
230 spin_lock(&obd_types_lock);
231 cfs_list_add(&type->typ_chain, &obd_types);
232 spin_unlock(&obd_types_lock);
238 if (type->typ_name != NULL) {
239 #ifndef HAVE_ONLY_PROCFS_SEQ
240 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
242 remove_proc_subtree(type->typ_name, proc_lustre_root);
246 if (type->typ_name != NULL)
247 OBD_FREE(type->typ_name, strlen(name) + 1);
248 if (type->typ_md_ops != NULL)
249 OBD_FREE_PTR(type->typ_md_ops);
250 if (type->typ_dt_ops != NULL)
251 OBD_FREE_PTR(type->typ_dt_ops);
252 OBD_FREE(type, sizeof(*type));
255 EXPORT_SYMBOL(class_register_type);
257 int class_unregister_type(const char *name)
259 struct obd_type *type = class_search_type(name);
263 CERROR("unknown obd type\n");
267 if (type->typ_refcnt) {
268 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
269 /* This is a bad situation, let's make the best of it */
270 /* Remove ops, but leave the name for debugging */
271 OBD_FREE_PTR(type->typ_dt_ops);
272 OBD_FREE_PTR(type->typ_md_ops);
276 /* we do not use type->typ_procroot as for compatibility purposes
277 * other modules can share names (i.e. lod can use lov entry). so
278 * we can't reference pointer as it can get invalided when another
279 * module removes the entry */
281 #ifndef HAVE_ONLY_PROCFS_SEQ
282 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
284 remove_proc_subtree(type->typ_name, proc_lustre_root);
288 lu_device_type_fini(type->typ_lu);
290 spin_lock(&obd_types_lock);
291 cfs_list_del(&type->typ_chain);
292 spin_unlock(&obd_types_lock);
293 OBD_FREE(type->typ_name, strlen(name) + 1);
294 if (type->typ_dt_ops != NULL)
295 OBD_FREE_PTR(type->typ_dt_ops);
296 if (type->typ_md_ops != NULL)
297 OBD_FREE_PTR(type->typ_md_ops);
298 OBD_FREE(type, sizeof(*type));
300 } /* class_unregister_type */
301 EXPORT_SYMBOL(class_unregister_type);
304 * Create a new obd device.
306 * Find an empty slot in ::obd_devs[], create a new obd device in it.
308 * \param[in] type_name obd device type string.
309 * \param[in] name obd device name.
311 * \retval NULL if create fails, otherwise return the obd device
314 struct obd_device *class_newdev(const char *type_name, const char *name)
316 struct obd_device *result = NULL;
317 struct obd_device *newdev;
318 struct obd_type *type = NULL;
320 int new_obd_minor = 0;
323 if (strlen(name) >= MAX_OBD_NAME) {
324 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
325 RETURN(ERR_PTR(-EINVAL));
328 type = class_get_type(type_name);
330 CERROR("OBD: unknown type: %s\n", type_name);
331 RETURN(ERR_PTR(-ENODEV));
334 newdev = obd_device_alloc();
336 GOTO(out_type, result = ERR_PTR(-ENOMEM));
338 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
340 write_lock(&obd_dev_lock);
341 for (i = 0; i < class_devno_max(); i++) {
342 struct obd_device *obd = class_num2obd(i);
344 if (obd && (strcmp(name, obd->obd_name) == 0)) {
345 CERROR("Device %s already exists at %d, won't add\n",
348 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
349 "%p obd_magic %08x != %08x\n", result,
350 result->obd_magic, OBD_DEVICE_MAGIC);
351 LASSERTF(result->obd_minor == new_obd_minor,
352 "%p obd_minor %d != %d\n", result,
353 result->obd_minor, new_obd_minor);
355 obd_devs[result->obd_minor] = NULL;
356 result->obd_name[0]='\0';
358 result = ERR_PTR(-EEXIST);
361 if (!result && !obd) {
363 result->obd_minor = i;
365 result->obd_type = type;
366 strncpy(result->obd_name, name,
367 sizeof(result->obd_name) - 1);
368 obd_devs[i] = result;
371 write_unlock(&obd_dev_lock);
373 if (result == NULL && i >= class_devno_max()) {
374 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
376 GOTO(out, result = ERR_PTR(-EOVERFLOW));
382 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
383 result->obd_name, result);
387 obd_device_free(newdev);
389 class_put_type(type);
393 void class_release_dev(struct obd_device *obd)
395 struct obd_type *obd_type = obd->obd_type;
397 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
398 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
399 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
400 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
401 LASSERT(obd_type != NULL);
403 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
404 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
406 write_lock(&obd_dev_lock);
407 obd_devs[obd->obd_minor] = NULL;
408 write_unlock(&obd_dev_lock);
409 obd_device_free(obd);
411 class_put_type(obd_type);
414 int class_name2dev(const char *name)
421 read_lock(&obd_dev_lock);
422 for (i = 0; i < class_devno_max(); i++) {
423 struct obd_device *obd = class_num2obd(i);
425 if (obd && strcmp(name, obd->obd_name) == 0) {
426 /* Make sure we finished attaching before we give
427 out any references */
428 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
429 if (obd->obd_attached) {
430 read_unlock(&obd_dev_lock);
436 read_unlock(&obd_dev_lock);
440 EXPORT_SYMBOL(class_name2dev);
442 struct obd_device *class_name2obd(const char *name)
444 int dev = class_name2dev(name);
446 if (dev < 0 || dev > class_devno_max())
448 return class_num2obd(dev);
450 EXPORT_SYMBOL(class_name2obd);
452 int class_uuid2dev(struct obd_uuid *uuid)
456 read_lock(&obd_dev_lock);
457 for (i = 0; i < class_devno_max(); i++) {
458 struct obd_device *obd = class_num2obd(i);
460 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
461 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
462 read_unlock(&obd_dev_lock);
466 read_unlock(&obd_dev_lock);
470 EXPORT_SYMBOL(class_uuid2dev);
472 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
474 int dev = class_uuid2dev(uuid);
477 return class_num2obd(dev);
479 EXPORT_SYMBOL(class_uuid2obd);
482 * Get obd device from ::obd_devs[]
484 * \param num [in] array index
486 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
487 * otherwise return the obd device there.
489 struct obd_device *class_num2obd(int num)
491 struct obd_device *obd = NULL;
493 if (num < class_devno_max()) {
498 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
499 "%p obd_magic %08x != %08x\n",
500 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
501 LASSERTF(obd->obd_minor == num,
502 "%p obd_minor %0d != %0d\n",
503 obd, obd->obd_minor, num);
508 EXPORT_SYMBOL(class_num2obd);
511 * Get obd devices count. Device in any
513 * \retval obd device count
515 int get_devices_count(void)
517 int index, max_index = class_devno_max(), dev_count = 0;
519 read_lock(&obd_dev_lock);
520 for (index = 0; index <= max_index; index++) {
521 struct obd_device *obd = class_num2obd(index);
525 read_unlock(&obd_dev_lock);
529 EXPORT_SYMBOL(get_devices_count);
531 void class_obd_list(void)
536 read_lock(&obd_dev_lock);
537 for (i = 0; i < class_devno_max(); i++) {
538 struct obd_device *obd = class_num2obd(i);
542 if (obd->obd_stopping)
544 else if (obd->obd_set_up)
546 else if (obd->obd_attached)
550 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
551 i, status, obd->obd_type->typ_name,
552 obd->obd_name, obd->obd_uuid.uuid,
553 atomic_read(&obd->obd_refcount));
555 read_unlock(&obd_dev_lock);
559 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
560 specified, then only the client with that uuid is returned,
561 otherwise any client connected to the tgt is returned. */
562 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
563 const char * typ_name,
564 struct obd_uuid *grp_uuid)
568 read_lock(&obd_dev_lock);
569 for (i = 0; i < class_devno_max(); i++) {
570 struct obd_device *obd = class_num2obd(i);
574 if ((strncmp(obd->obd_type->typ_name, typ_name,
575 strlen(typ_name)) == 0)) {
576 if (obd_uuid_equals(tgt_uuid,
577 &obd->u.cli.cl_target_uuid) &&
578 ((grp_uuid)? obd_uuid_equals(grp_uuid,
579 &obd->obd_uuid) : 1)) {
580 read_unlock(&obd_dev_lock);
585 read_unlock(&obd_dev_lock);
589 EXPORT_SYMBOL(class_find_client_obd);
591 /* Iterate the obd_device list looking devices have grp_uuid. Start
592 searching at *next, and if a device is found, the next index to look
593 at is saved in *next. If next is NULL, then the first matching device
594 will always be returned. */
595 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
601 else if (*next >= 0 && *next < class_devno_max())
606 read_lock(&obd_dev_lock);
607 for (; i < class_devno_max(); i++) {
608 struct obd_device *obd = class_num2obd(i);
612 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
615 read_unlock(&obd_dev_lock);
619 read_unlock(&obd_dev_lock);
623 EXPORT_SYMBOL(class_devices_in_group);
626 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
627 * adjust sptlrpc settings accordingly.
629 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
631 struct obd_device *obd;
635 LASSERT(namelen > 0);
637 read_lock(&obd_dev_lock);
638 for (i = 0; i < class_devno_max(); i++) {
639 obd = class_num2obd(i);
641 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
644 /* only notify mdc, osc, mdt, ost */
645 type = obd->obd_type->typ_name;
646 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
647 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
648 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
649 strcmp(type, LUSTRE_OST_NAME) != 0)
652 if (strncmp(obd->obd_name, fsname, namelen))
655 class_incref(obd, __FUNCTION__, obd);
656 read_unlock(&obd_dev_lock);
657 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
658 sizeof(KEY_SPTLRPC_CONF),
659 KEY_SPTLRPC_CONF, 0, NULL, NULL);
661 class_decref(obd, __FUNCTION__, obd);
662 read_lock(&obd_dev_lock);
664 read_unlock(&obd_dev_lock);
667 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
669 void obd_cleanup_caches(void)
672 if (obd_device_cachep) {
673 kmem_cache_destroy(obd_device_cachep);
674 obd_device_cachep = NULL;
677 kmem_cache_destroy(obdo_cachep);
681 kmem_cache_destroy(import_cachep);
682 import_cachep = NULL;
685 kmem_cache_destroy(capa_cachep);
691 int obd_init_caches(void)
696 LASSERT(obd_device_cachep == NULL);
697 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
698 sizeof(struct obd_device),
700 if (!obd_device_cachep)
701 GOTO(out, rc = -ENOMEM);
703 LASSERT(obdo_cachep == NULL);
704 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
707 GOTO(out, rc = -ENOMEM);
709 LASSERT(import_cachep == NULL);
710 import_cachep = kmem_cache_create("ll_import_cache",
711 sizeof(struct obd_import),
714 GOTO(out, rc = -ENOMEM);
716 LASSERT(capa_cachep == NULL);
717 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
720 GOTO(out, rc = -ENOMEM);
724 obd_cleanup_caches();
728 /* map connection to client */
729 struct obd_export *class_conn2export(struct lustre_handle *conn)
731 struct obd_export *export;
735 CDEBUG(D_CACHE, "looking for null handle\n");
739 if (conn->cookie == -1) { /* this means assign a new connection */
740 CDEBUG(D_CACHE, "want a new connection\n");
744 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
745 export = class_handle2object(conn->cookie, NULL);
748 EXPORT_SYMBOL(class_conn2export);
750 struct obd_device *class_exp2obd(struct obd_export *exp)
756 EXPORT_SYMBOL(class_exp2obd);
758 struct obd_device *class_conn2obd(struct lustre_handle *conn)
760 struct obd_export *export;
761 export = class_conn2export(conn);
763 struct obd_device *obd = export->exp_obd;
764 class_export_put(export);
769 EXPORT_SYMBOL(class_conn2obd);
771 struct obd_import *class_exp2cliimp(struct obd_export *exp)
773 struct obd_device *obd = exp->exp_obd;
776 return obd->u.cli.cl_import;
778 EXPORT_SYMBOL(class_exp2cliimp);
780 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
782 struct obd_device *obd = class_conn2obd(conn);
785 return obd->u.cli.cl_import;
787 EXPORT_SYMBOL(class_conn2cliimp);
789 /* Export management functions */
790 static void class_export_destroy(struct obd_export *exp)
792 struct obd_device *obd = exp->exp_obd;
795 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
796 LASSERT(obd != NULL);
798 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
799 exp->exp_client_uuid.uuid, obd->obd_name);
801 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
802 if (exp->exp_connection)
803 ptlrpc_put_connection_superhack(exp->exp_connection);
805 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
806 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
807 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
808 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
809 obd_destroy_export(exp);
810 class_decref(obd, "export", exp);
812 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
816 static void export_handle_addref(void *export)
818 class_export_get(export);
821 static struct portals_handle_ops export_handle_ops = {
822 .hop_addref = export_handle_addref,
826 struct obd_export *class_export_get(struct obd_export *exp)
828 atomic_inc(&exp->exp_refcount);
829 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
830 atomic_read(&exp->exp_refcount));
833 EXPORT_SYMBOL(class_export_get);
835 void class_export_put(struct obd_export *exp)
837 LASSERT(exp != NULL);
838 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
839 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
840 atomic_read(&exp->exp_refcount) - 1);
842 if (atomic_dec_and_test(&exp->exp_refcount)) {
843 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
844 CDEBUG(D_IOCTL, "final put %p/%s\n",
845 exp, exp->exp_client_uuid.uuid);
847 /* release nid stat refererence */
848 lprocfs_exp_cleanup(exp);
850 obd_zombie_export_add(exp);
853 EXPORT_SYMBOL(class_export_put);
855 /* Creates a new export, adds it to the hash table, and returns a
856 * pointer to it. The refcount is 2: one for the hash reference, and
857 * one for the pointer returned by this function. */
858 struct obd_export *class_new_export(struct obd_device *obd,
859 struct obd_uuid *cluuid)
861 struct obd_export *export;
862 cfs_hash_t *hash = NULL;
866 OBD_ALLOC_PTR(export);
868 return ERR_PTR(-ENOMEM);
870 export->exp_conn_cnt = 0;
871 export->exp_lock_hash = NULL;
872 export->exp_flock_hash = NULL;
873 atomic_set(&export->exp_refcount, 2);
874 atomic_set(&export->exp_rpc_count, 0);
875 atomic_set(&export->exp_cb_count, 0);
876 atomic_set(&export->exp_locks_count, 0);
877 #if LUSTRE_TRACKS_LOCK_EXP_REFS
878 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
879 spin_lock_init(&export->exp_locks_list_guard);
881 atomic_set(&export->exp_replay_count, 0);
882 export->exp_obd = obd;
883 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
884 spin_lock_init(&export->exp_uncommitted_replies_lock);
885 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
886 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
887 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
888 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
889 CFS_INIT_LIST_HEAD(&export->exp_reg_rpcs);
890 class_handle_hash(&export->exp_handle, &export_handle_ops);
891 export->exp_last_request_time = cfs_time_current_sec();
892 spin_lock_init(&export->exp_lock);
893 spin_lock_init(&export->exp_rpc_lock);
894 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
895 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
896 spin_lock_init(&export->exp_bl_list_lock);
897 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
899 export->exp_sp_peer = LUSTRE_SP_ANY;
900 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
901 export->exp_client_uuid = *cluuid;
902 obd_init_export(export);
904 spin_lock(&obd->obd_dev_lock);
905 /* shouldn't happen, but might race */
906 if (obd->obd_stopping)
907 GOTO(exit_unlock, rc = -ENODEV);
909 hash = cfs_hash_getref(obd->obd_uuid_hash);
911 GOTO(exit_unlock, rc = -ENODEV);
912 spin_unlock(&obd->obd_dev_lock);
914 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
915 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
917 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
918 obd->obd_name, cluuid->uuid, rc);
919 GOTO(exit_err, rc = -EALREADY);
923 spin_lock(&obd->obd_dev_lock);
924 if (obd->obd_stopping) {
925 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
926 GOTO(exit_unlock, rc = -ENODEV);
929 class_incref(obd, "export", export);
930 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
931 cfs_list_add_tail(&export->exp_obd_chain_timed,
932 &export->exp_obd->obd_exports_timed);
933 export->exp_obd->obd_num_exports++;
934 spin_unlock(&obd->obd_dev_lock);
935 cfs_hash_putref(hash);
939 spin_unlock(&obd->obd_dev_lock);
942 cfs_hash_putref(hash);
943 class_handle_unhash(&export->exp_handle);
944 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
945 obd_destroy_export(export);
946 OBD_FREE_PTR(export);
949 EXPORT_SYMBOL(class_new_export);
951 void class_unlink_export(struct obd_export *exp)
953 class_handle_unhash(&exp->exp_handle);
955 spin_lock(&exp->exp_obd->obd_dev_lock);
956 /* delete an uuid-export hashitem from hashtables */
957 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
958 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
959 &exp->exp_client_uuid,
960 &exp->exp_uuid_hash);
962 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
963 cfs_list_del_init(&exp->exp_obd_chain_timed);
964 exp->exp_obd->obd_num_exports--;
965 spin_unlock(&exp->exp_obd->obd_dev_lock);
966 class_export_put(exp);
968 EXPORT_SYMBOL(class_unlink_export);
970 /* Import management functions */
971 void class_import_destroy(struct obd_import *imp)
975 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
976 imp->imp_obd->obd_name);
978 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
980 ptlrpc_put_connection_superhack(imp->imp_connection);
982 while (!cfs_list_empty(&imp->imp_conn_list)) {
983 struct obd_import_conn *imp_conn;
985 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
986 struct obd_import_conn, oic_item);
987 cfs_list_del_init(&imp_conn->oic_item);
988 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
989 OBD_FREE(imp_conn, sizeof(*imp_conn));
992 LASSERT(imp->imp_sec == NULL);
993 class_decref(imp->imp_obd, "import", imp);
994 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
998 static void import_handle_addref(void *import)
1000 class_import_get(import);
1003 static struct portals_handle_ops import_handle_ops = {
1004 .hop_addref = import_handle_addref,
1008 struct obd_import *class_import_get(struct obd_import *import)
1010 atomic_inc(&import->imp_refcount);
1011 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1012 atomic_read(&import->imp_refcount),
1013 import->imp_obd->obd_name);
1016 EXPORT_SYMBOL(class_import_get);
1018 void class_import_put(struct obd_import *imp)
1022 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1023 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1025 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1026 atomic_read(&imp->imp_refcount) - 1,
1027 imp->imp_obd->obd_name);
1029 if (atomic_dec_and_test(&imp->imp_refcount)) {
1030 CDEBUG(D_INFO, "final put import %p\n", imp);
1031 obd_zombie_import_add(imp);
1034 /* catch possible import put race */
1035 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1038 EXPORT_SYMBOL(class_import_put);
1040 static void init_imp_at(struct imp_at *at) {
1042 at_init(&at->iat_net_latency, 0, 0);
1043 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1044 /* max service estimates are tracked on the server side, so
1045 don't use the AT history here, just use the last reported
1046 val. (But keep hist for proc histogram, worst_ever) */
1047 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1052 struct obd_import *class_new_import(struct obd_device *obd)
1054 struct obd_import *imp;
1056 OBD_ALLOC(imp, sizeof(*imp));
1060 CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1061 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1062 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1063 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1064 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1065 CFS_INIT_LIST_HEAD(&imp->imp_committed_list);
1066 imp->imp_replay_cursor = &imp->imp_committed_list;
1067 spin_lock_init(&imp->imp_lock);
1068 imp->imp_last_success_conn = 0;
1069 imp->imp_state = LUSTRE_IMP_NEW;
1070 imp->imp_obd = class_incref(obd, "import", imp);
1071 mutex_init(&imp->imp_sec_mutex);
1072 init_waitqueue_head(&imp->imp_recovery_waitq);
1074 atomic_set(&imp->imp_refcount, 2);
1075 atomic_set(&imp->imp_unregistering, 0);
1076 atomic_set(&imp->imp_inflight, 0);
1077 atomic_set(&imp->imp_replay_inflight, 0);
1078 atomic_set(&imp->imp_inval_count, 0);
1079 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1080 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1081 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1082 init_imp_at(&imp->imp_at);
1084 /* the default magic is V2, will be used in connect RPC, and
1085 * then adjusted according to the flags in request/reply. */
1086 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1090 EXPORT_SYMBOL(class_new_import);
1092 void class_destroy_import(struct obd_import *import)
1094 LASSERT(import != NULL);
1095 LASSERT(import != LP_POISON);
1097 class_handle_unhash(&import->imp_handle);
1099 spin_lock(&import->imp_lock);
1100 import->imp_generation++;
1101 spin_unlock(&import->imp_lock);
1102 class_import_put(import);
1104 EXPORT_SYMBOL(class_destroy_import);
1106 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1108 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1110 spin_lock(&exp->exp_locks_list_guard);
1112 LASSERT(lock->l_exp_refs_nr >= 0);
1114 if (lock->l_exp_refs_target != NULL &&
1115 lock->l_exp_refs_target != exp) {
1116 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1117 exp, lock, lock->l_exp_refs_target);
1119 if ((lock->l_exp_refs_nr ++) == 0) {
1120 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1121 lock->l_exp_refs_target = exp;
1123 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1124 lock, exp, lock->l_exp_refs_nr);
1125 spin_unlock(&exp->exp_locks_list_guard);
1127 EXPORT_SYMBOL(__class_export_add_lock_ref);
1129 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1131 spin_lock(&exp->exp_locks_list_guard);
1132 LASSERT(lock->l_exp_refs_nr > 0);
1133 if (lock->l_exp_refs_target != exp) {
1134 LCONSOLE_WARN("lock %p, "
1135 "mismatching export pointers: %p, %p\n",
1136 lock, lock->l_exp_refs_target, exp);
1138 if (-- lock->l_exp_refs_nr == 0) {
1139 cfs_list_del_init(&lock->l_exp_refs_link);
1140 lock->l_exp_refs_target = NULL;
1142 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1143 lock, exp, lock->l_exp_refs_nr);
1144 spin_unlock(&exp->exp_locks_list_guard);
1146 EXPORT_SYMBOL(__class_export_del_lock_ref);
1149 /* A connection defines an export context in which preallocation can
1150 be managed. This releases the export pointer reference, and returns
1151 the export handle, so the export refcount is 1 when this function
1153 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1154 struct obd_uuid *cluuid)
1156 struct obd_export *export;
1157 LASSERT(conn != NULL);
1158 LASSERT(obd != NULL);
1159 LASSERT(cluuid != NULL);
1162 export = class_new_export(obd, cluuid);
1164 RETURN(PTR_ERR(export));
1166 conn->cookie = export->exp_handle.h_cookie;
1167 class_export_put(export);
1169 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1170 cluuid->uuid, conn->cookie);
1173 EXPORT_SYMBOL(class_connect);
1175 /* if export is involved in recovery then clean up related things */
1176 void class_export_recovery_cleanup(struct obd_export *exp)
1178 struct obd_device *obd = exp->exp_obd;
1180 spin_lock(&obd->obd_recovery_task_lock);
1181 if (exp->exp_delayed)
1182 obd->obd_delayed_clients--;
1183 if (obd->obd_recovering) {
1184 if (exp->exp_in_recovery) {
1185 spin_lock(&exp->exp_lock);
1186 exp->exp_in_recovery = 0;
1187 spin_unlock(&exp->exp_lock);
1188 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1189 atomic_dec(&obd->obd_connected_clients);
1192 /* if called during recovery then should update
1193 * obd_stale_clients counter,
1194 * lightweight exports are not counted */
1195 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1196 exp->exp_obd->obd_stale_clients++;
1198 spin_unlock(&obd->obd_recovery_task_lock);
1199 /** Cleanup req replay fields */
1200 if (exp->exp_req_replay_needed) {
1201 spin_lock(&exp->exp_lock);
1202 exp->exp_req_replay_needed = 0;
1203 spin_unlock(&exp->exp_lock);
1204 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1205 atomic_dec(&obd->obd_req_replay_clients);
1207 /** Cleanup lock replay data */
1208 if (exp->exp_lock_replay_needed) {
1209 spin_lock(&exp->exp_lock);
1210 exp->exp_lock_replay_needed = 0;
1211 spin_unlock(&exp->exp_lock);
1212 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1213 atomic_dec(&obd->obd_lock_replay_clients);
1217 /* This function removes 1-3 references from the export:
1218 * 1 - for export pointer passed
1219 * and if disconnect really need
1220 * 2 - removing from hash
1221 * 3 - in client_unlink_export
1222 * The export pointer passed to this function can destroyed */
1223 int class_disconnect(struct obd_export *export)
1225 int already_disconnected;
1228 if (export == NULL) {
1229 CWARN("attempting to free NULL export %p\n", export);
1233 spin_lock(&export->exp_lock);
1234 already_disconnected = export->exp_disconnected;
1235 export->exp_disconnected = 1;
1236 spin_unlock(&export->exp_lock);
1238 /* class_cleanup(), abort_recovery(), and class_fail_export()
1239 * all end up in here, and if any of them race we shouldn't
1240 * call extra class_export_puts(). */
1241 if (already_disconnected) {
1242 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1243 GOTO(no_disconn, already_disconnected);
1246 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1247 export->exp_handle.h_cookie);
1249 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1250 cfs_hash_del(export->exp_obd->obd_nid_hash,
1251 &export->exp_connection->c_peer.nid,
1252 &export->exp_nid_hash);
1254 class_export_recovery_cleanup(export);
1255 class_unlink_export(export);
1257 class_export_put(export);
1260 EXPORT_SYMBOL(class_disconnect);
1262 /* Return non-zero for a fully connected export */
1263 int class_connected_export(struct obd_export *exp)
1268 spin_lock(&exp->exp_lock);
1269 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1270 spin_unlock(&exp->exp_lock);
1274 EXPORT_SYMBOL(class_connected_export);
1276 static void class_disconnect_export_list(cfs_list_t *list,
1277 enum obd_option flags)
1280 struct obd_export *exp;
1283 /* It's possible that an export may disconnect itself, but
1284 * nothing else will be added to this list. */
1285 while (!cfs_list_empty(list)) {
1286 exp = cfs_list_entry(list->next, struct obd_export,
1288 /* need for safe call CDEBUG after obd_disconnect */
1289 class_export_get(exp);
1291 spin_lock(&exp->exp_lock);
1292 exp->exp_flags = flags;
1293 spin_unlock(&exp->exp_lock);
1295 if (obd_uuid_equals(&exp->exp_client_uuid,
1296 &exp->exp_obd->obd_uuid)) {
1298 "exp %p export uuid == obd uuid, don't discon\n",
1300 /* Need to delete this now so we don't end up pointing
1301 * to work_list later when this export is cleaned up. */
1302 cfs_list_del_init(&exp->exp_obd_chain);
1303 class_export_put(exp);
1307 class_export_get(exp);
1308 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1309 "last request at "CFS_TIME_T"\n",
1310 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1311 exp, exp->exp_last_request_time);
1312 /* release one export reference anyway */
1313 rc = obd_disconnect(exp);
1315 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1316 obd_export_nid2str(exp), exp, rc);
1317 class_export_put(exp);
1322 void class_disconnect_exports(struct obd_device *obd)
1324 cfs_list_t work_list;
1327 /* Move all of the exports from obd_exports to a work list, en masse. */
1328 CFS_INIT_LIST_HEAD(&work_list);
1329 spin_lock(&obd->obd_dev_lock);
1330 cfs_list_splice_init(&obd->obd_exports, &work_list);
1331 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1332 spin_unlock(&obd->obd_dev_lock);
1334 if (!cfs_list_empty(&work_list)) {
1335 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1336 "disconnecting them\n", obd->obd_minor, obd);
1337 class_disconnect_export_list(&work_list,
1338 exp_flags_from_obd(obd));
1340 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1341 obd->obd_minor, obd);
1344 EXPORT_SYMBOL(class_disconnect_exports);
1346 /* Remove exports that have not completed recovery.
1348 void class_disconnect_stale_exports(struct obd_device *obd,
1349 int (*test_export)(struct obd_export *))
1351 cfs_list_t work_list;
1352 struct obd_export *exp, *n;
1356 CFS_INIT_LIST_HEAD(&work_list);
1357 spin_lock(&obd->obd_dev_lock);
1358 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1360 /* don't count self-export as client */
1361 if (obd_uuid_equals(&exp->exp_client_uuid,
1362 &exp->exp_obd->obd_uuid))
1365 /* don't evict clients which have no slot in last_rcvd
1366 * (e.g. lightweight connection) */
1367 if (exp->exp_target_data.ted_lr_idx == -1)
1370 spin_lock(&exp->exp_lock);
1371 if (exp->exp_failed || test_export(exp)) {
1372 spin_unlock(&exp->exp_lock);
1375 exp->exp_failed = 1;
1376 spin_unlock(&exp->exp_lock);
1378 cfs_list_move(&exp->exp_obd_chain, &work_list);
1380 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1381 obd->obd_name, exp->exp_client_uuid.uuid,
1382 exp->exp_connection == NULL ? "<unknown>" :
1383 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1384 print_export_data(exp, "EVICTING", 0);
1386 spin_unlock(&obd->obd_dev_lock);
1389 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1390 obd->obd_name, evicted);
1392 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1393 OBD_OPT_ABORT_RECOV);
1396 EXPORT_SYMBOL(class_disconnect_stale_exports);
1398 void class_fail_export(struct obd_export *exp)
1400 int rc, already_failed;
1402 spin_lock(&exp->exp_lock);
1403 already_failed = exp->exp_failed;
1404 exp->exp_failed = 1;
1405 spin_unlock(&exp->exp_lock);
1407 if (already_failed) {
1408 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1409 exp, exp->exp_client_uuid.uuid);
1413 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1414 exp, exp->exp_client_uuid.uuid);
1416 if (obd_dump_on_timeout)
1417 libcfs_debug_dumplog();
1419 /* need for safe call CDEBUG after obd_disconnect */
1420 class_export_get(exp);
1422 /* Most callers into obd_disconnect are removing their own reference
1423 * (request, for example) in addition to the one from the hash table.
1424 * We don't have such a reference here, so make one. */
1425 class_export_get(exp);
1426 rc = obd_disconnect(exp);
1428 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1430 CDEBUG(D_HA, "disconnected export %p/%s\n",
1431 exp, exp->exp_client_uuid.uuid);
1432 class_export_put(exp);
1434 EXPORT_SYMBOL(class_fail_export);
1436 char *obd_export_nid2str(struct obd_export *exp)
1438 if (exp->exp_connection != NULL)
1439 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1443 EXPORT_SYMBOL(obd_export_nid2str);
1445 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1447 cfs_hash_t *nid_hash;
1448 struct obd_export *doomed_exp = NULL;
1449 int exports_evicted = 0;
1451 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1453 spin_lock(&obd->obd_dev_lock);
1454 /* umount has run already, so evict thread should leave
1455 * its task to umount thread now */
1456 if (obd->obd_stopping) {
1457 spin_unlock(&obd->obd_dev_lock);
1458 return exports_evicted;
1460 nid_hash = obd->obd_nid_hash;
1461 cfs_hash_getref(nid_hash);
1462 spin_unlock(&obd->obd_dev_lock);
1465 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1466 if (doomed_exp == NULL)
1469 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1470 "nid %s found, wanted nid %s, requested nid %s\n",
1471 obd_export_nid2str(doomed_exp),
1472 libcfs_nid2str(nid_key), nid);
1473 LASSERTF(doomed_exp != obd->obd_self_export,
1474 "self-export is hashed by NID?\n");
1476 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1477 "request\n", obd->obd_name,
1478 obd_uuid2str(&doomed_exp->exp_client_uuid),
1479 obd_export_nid2str(doomed_exp));
1480 class_fail_export(doomed_exp);
1481 class_export_put(doomed_exp);
1484 cfs_hash_putref(nid_hash);
1486 if (!exports_evicted)
1487 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1488 obd->obd_name, nid);
1489 return exports_evicted;
1491 EXPORT_SYMBOL(obd_export_evict_by_nid);
1493 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1495 cfs_hash_t *uuid_hash;
1496 struct obd_export *doomed_exp = NULL;
1497 struct obd_uuid doomed_uuid;
1498 int exports_evicted = 0;
1500 spin_lock(&obd->obd_dev_lock);
1501 if (obd->obd_stopping) {
1502 spin_unlock(&obd->obd_dev_lock);
1503 return exports_evicted;
1505 uuid_hash = obd->obd_uuid_hash;
1506 cfs_hash_getref(uuid_hash);
1507 spin_unlock(&obd->obd_dev_lock);
1509 obd_str2uuid(&doomed_uuid, uuid);
1510 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1511 CERROR("%s: can't evict myself\n", obd->obd_name);
1512 cfs_hash_putref(uuid_hash);
1513 return exports_evicted;
1516 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1518 if (doomed_exp == NULL) {
1519 CERROR("%s: can't disconnect %s: no exports found\n",
1520 obd->obd_name, uuid);
1522 CWARN("%s: evicting %s at adminstrative request\n",
1523 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1524 class_fail_export(doomed_exp);
1525 class_export_put(doomed_exp);
1528 cfs_hash_putref(uuid_hash);
1530 return exports_evicted;
1532 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1534 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1535 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1536 EXPORT_SYMBOL(class_export_dump_hook);
1539 static void print_export_data(struct obd_export *exp, const char *status,
1542 struct ptlrpc_reply_state *rs;
1543 struct ptlrpc_reply_state *first_reply = NULL;
1546 spin_lock(&exp->exp_lock);
1547 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1553 spin_unlock(&exp->exp_lock);
1555 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1556 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1557 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1558 atomic_read(&exp->exp_rpc_count),
1559 atomic_read(&exp->exp_cb_count),
1560 atomic_read(&exp->exp_locks_count),
1561 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1562 nreplies, first_reply, nreplies > 3 ? "..." : "",
1563 exp->exp_last_committed);
1564 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1565 if (locks && class_export_dump_hook != NULL)
1566 class_export_dump_hook(exp);
1570 void dump_exports(struct obd_device *obd, int locks)
1572 struct obd_export *exp;
1574 spin_lock(&obd->obd_dev_lock);
1575 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1576 print_export_data(exp, "ACTIVE", locks);
1577 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1578 print_export_data(exp, "UNLINKED", locks);
1579 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1580 print_export_data(exp, "DELAYED", locks);
1581 spin_unlock(&obd->obd_dev_lock);
1582 spin_lock(&obd_zombie_impexp_lock);
1583 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1584 print_export_data(exp, "ZOMBIE", locks);
1585 spin_unlock(&obd_zombie_impexp_lock);
1587 EXPORT_SYMBOL(dump_exports);
1589 void obd_exports_barrier(struct obd_device *obd)
1592 LASSERT(cfs_list_empty(&obd->obd_exports));
1593 spin_lock(&obd->obd_dev_lock);
1594 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1595 spin_unlock(&obd->obd_dev_lock);
1596 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1597 cfs_time_seconds(waited));
1598 if (waited > 5 && IS_PO2(waited)) {
1599 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1600 "more than %d seconds. "
1601 "The obd refcount = %d. Is it stuck?\n",
1602 obd->obd_name, waited,
1603 atomic_read(&obd->obd_refcount));
1604 dump_exports(obd, 1);
1607 spin_lock(&obd->obd_dev_lock);
1609 spin_unlock(&obd->obd_dev_lock);
1611 EXPORT_SYMBOL(obd_exports_barrier);
1613 /* Total amount of zombies to be destroyed */
1614 static int zombies_count = 0;
1617 * kill zombie imports and exports
1619 void obd_zombie_impexp_cull(void)
1621 struct obd_import *import;
1622 struct obd_export *export;
1626 spin_lock(&obd_zombie_impexp_lock);
1629 if (!cfs_list_empty(&obd_zombie_imports)) {
1630 import = cfs_list_entry(obd_zombie_imports.next,
1633 cfs_list_del_init(&import->imp_zombie_chain);
1637 if (!cfs_list_empty(&obd_zombie_exports)) {
1638 export = cfs_list_entry(obd_zombie_exports.next,
1641 cfs_list_del_init(&export->exp_obd_chain);
1644 spin_unlock(&obd_zombie_impexp_lock);
1646 if (import != NULL) {
1647 class_import_destroy(import);
1648 spin_lock(&obd_zombie_impexp_lock);
1650 spin_unlock(&obd_zombie_impexp_lock);
1653 if (export != NULL) {
1654 class_export_destroy(export);
1655 spin_lock(&obd_zombie_impexp_lock);
1657 spin_unlock(&obd_zombie_impexp_lock);
1661 } while (import != NULL || export != NULL);
1665 static struct completion obd_zombie_start;
1666 static struct completion obd_zombie_stop;
1667 static unsigned long obd_zombie_flags;
1668 static wait_queue_head_t obd_zombie_waitq;
1669 static pid_t obd_zombie_pid;
1672 OBD_ZOMBIE_STOP = 0x0001,
1676 * check for work for kill zombie import/export thread.
1678 static int obd_zombie_impexp_check(void *arg)
1682 spin_lock(&obd_zombie_impexp_lock);
1683 rc = (zombies_count == 0) &&
1684 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1685 spin_unlock(&obd_zombie_impexp_lock);
1691 * Add export to the obd_zombe thread and notify it.
1693 static void obd_zombie_export_add(struct obd_export *exp) {
1694 spin_lock(&exp->exp_obd->obd_dev_lock);
1695 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1696 cfs_list_del_init(&exp->exp_obd_chain);
1697 spin_unlock(&exp->exp_obd->obd_dev_lock);
1698 spin_lock(&obd_zombie_impexp_lock);
1700 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1701 spin_unlock(&obd_zombie_impexp_lock);
1703 obd_zombie_impexp_notify();
1707 * Add import to the obd_zombe thread and notify it.
1709 static void obd_zombie_import_add(struct obd_import *imp) {
1710 LASSERT(imp->imp_sec == NULL);
1711 LASSERT(imp->imp_rq_pool == NULL);
1712 spin_lock(&obd_zombie_impexp_lock);
1713 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1715 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1716 spin_unlock(&obd_zombie_impexp_lock);
1718 obd_zombie_impexp_notify();
1722 * notify import/export destroy thread about new zombie.
1724 static void obd_zombie_impexp_notify(void)
1727 * Make sure obd_zomebie_impexp_thread get this notification.
1728 * It is possible this signal only get by obd_zombie_barrier, and
1729 * barrier gulps this notification and sleeps away and hangs ensues
1731 wake_up_all(&obd_zombie_waitq);
1735 * check whether obd_zombie is idle
1737 static int obd_zombie_is_idle(void)
1741 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1742 spin_lock(&obd_zombie_impexp_lock);
1743 rc = (zombies_count == 0);
1744 spin_unlock(&obd_zombie_impexp_lock);
1749 * wait when obd_zombie import/export queues become empty
1751 void obd_zombie_barrier(void)
1753 struct l_wait_info lwi = { 0 };
1755 if (obd_zombie_pid == current_pid())
1756 /* don't wait for myself */
1758 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1760 EXPORT_SYMBOL(obd_zombie_barrier);
1765 * destroy zombie export/import thread.
1767 static int obd_zombie_impexp_thread(void *unused)
1769 unshare_fs_struct();
1770 complete(&obd_zombie_start);
1772 obd_zombie_pid = current_pid();
1774 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1775 struct l_wait_info lwi = { 0 };
1777 l_wait_event(obd_zombie_waitq,
1778 !obd_zombie_impexp_check(NULL), &lwi);
1779 obd_zombie_impexp_cull();
1782 * Notify obd_zombie_barrier callers that queues
1785 wake_up(&obd_zombie_waitq);
1788 complete(&obd_zombie_stop);
1793 #else /* ! KERNEL */
1795 static atomic_t zombie_recur = ATOMIC_INIT(0);
1796 static void *obd_zombie_impexp_work_cb;
1797 static void *obd_zombie_impexp_idle_cb;
1799 int obd_zombie_impexp_kill(void *arg)
1803 if (atomic_inc_return(&zombie_recur) == 1) {
1804 obd_zombie_impexp_cull();
1807 atomic_dec(&zombie_recur);
1814 * start destroy zombie import/export thread
1816 int obd_zombie_impexp_init(void)
1819 struct task_struct *task;
1822 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1823 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1824 spin_lock_init(&obd_zombie_impexp_lock);
1825 init_completion(&obd_zombie_start);
1826 init_completion(&obd_zombie_stop);
1827 init_waitqueue_head(&obd_zombie_waitq);
1831 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1833 RETURN(PTR_ERR(task));
1835 wait_for_completion(&obd_zombie_start);
1838 obd_zombie_impexp_work_cb =
1839 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1840 &obd_zombie_impexp_kill, NULL);
1842 obd_zombie_impexp_idle_cb =
1843 liblustre_register_idle_callback("obd_zombi_impexp_check",
1844 &obd_zombie_impexp_check, NULL);
1849 * stop destroy zombie import/export thread
1851 void obd_zombie_impexp_stop(void)
1853 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1854 obd_zombie_impexp_notify();
1856 wait_for_completion(&obd_zombie_stop);
1858 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1859 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1863 /***** Kernel-userspace comm helpers *******/
1865 /* Get length of entire message, including header */
1866 int kuc_len(int payload_len)
1868 return sizeof(struct kuc_hdr) + payload_len;
1870 EXPORT_SYMBOL(kuc_len);
1872 /* Get a pointer to kuc header, given a ptr to the payload
1873 * @param p Pointer to payload area
1874 * @returns Pointer to kuc header
1876 struct kuc_hdr * kuc_ptr(void *p)
1878 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1879 LASSERT(lh->kuc_magic == KUC_MAGIC);
1882 EXPORT_SYMBOL(kuc_ptr);
1884 /* Test if payload is part of kuc message
1885 * @param p Pointer to payload area
1888 int kuc_ispayload(void *p)
1890 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1892 if (kh->kuc_magic == KUC_MAGIC)
1897 EXPORT_SYMBOL(kuc_ispayload);
1899 /* Alloc space for a message, and fill in header
1900 * @return Pointer to payload area
1902 void *kuc_alloc(int payload_len, int transport, int type)
1905 int len = kuc_len(payload_len);
1909 return ERR_PTR(-ENOMEM);
1911 lh->kuc_magic = KUC_MAGIC;
1912 lh->kuc_transport = transport;
1913 lh->kuc_msgtype = type;
1914 lh->kuc_msglen = len;
1916 return (void *)(lh + 1);
1918 EXPORT_SYMBOL(kuc_alloc);
1920 /* Takes pointer to payload area */
1921 inline void kuc_free(void *p, int payload_len)
1923 struct kuc_hdr *lh = kuc_ptr(p);
1924 OBD_FREE(lh, kuc_len(payload_len));
1926 EXPORT_SYMBOL(kuc_free);