4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_class.h>
44 #include <lprocfs_status.h>
45 #include <lustre_kernelcomm.h>
47 spinlock_t obd_types_lock;
49 static struct kmem_cache *obd_device_cachep;
50 struct kmem_cache *obdo_cachep;
51 EXPORT_SYMBOL(obdo_cachep);
52 static struct kmem_cache *import_cachep;
54 static struct list_head obd_zombie_imports;
55 static struct list_head obd_zombie_exports;
56 static spinlock_t obd_zombie_impexp_lock;
58 static void obd_zombie_impexp_notify(void);
59 static void obd_zombie_export_add(struct obd_export *exp);
60 static void obd_zombie_import_add(struct obd_import *imp);
61 static void print_export_data(struct obd_export *exp,
62 const char *status, int locks);
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
68 * support functions: we could use inter-module communication, but this
69 * is more portable to other OS's
71 static struct obd_device *obd_device_alloc(void)
73 struct obd_device *obd;
75 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
77 obd->obd_magic = OBD_DEVICE_MAGIC;
82 static void obd_device_free(struct obd_device *obd)
85 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87 if (obd->obd_namespace != NULL) {
88 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89 obd, obd->obd_namespace, obd->obd_force);
92 lu_ref_fini(&obd->obd_reference);
93 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 struct obd_type *class_search_type(const char *name)
98 struct list_head *tmp;
99 struct obd_type *type;
101 spin_lock(&obd_types_lock);
102 list_for_each(tmp, &obd_types) {
103 type = list_entry(tmp, struct obd_type, typ_chain);
104 if (strcmp(type->typ_name, name) == 0) {
105 spin_unlock(&obd_types_lock);
109 spin_unlock(&obd_types_lock);
112 EXPORT_SYMBOL(class_search_type);
114 struct obd_type *class_get_type(const char *name)
116 struct obd_type *type = class_search_type(name);
118 #ifdef HAVE_MODULE_LOADING_SUPPORT
120 const char *modname = name;
122 if (strcmp(modname, "obdfilter") == 0)
125 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
126 modname = LUSTRE_OSP_NAME;
128 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
129 modname = LUSTRE_MDT_NAME;
131 if (!request_module("%s", modname)) {
132 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
133 type = class_search_type(name);
135 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
141 spin_lock(&type->obd_type_lock);
143 try_module_get(type->typ_dt_ops->o_owner);
144 spin_unlock(&type->obd_type_lock);
149 void class_put_type(struct obd_type *type)
152 spin_lock(&type->obd_type_lock);
154 module_put(type->typ_dt_ops->o_owner);
155 spin_unlock(&type->obd_type_lock);
158 #define CLASS_MAX_NAME 1024
160 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
161 bool enable_proc, struct lprocfs_vars *vars,
162 const char *name, struct lu_device_type *ldt)
164 struct obd_type *type;
169 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
171 if (class_search_type(name)) {
172 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
177 OBD_ALLOC(type, sizeof(*type));
181 OBD_ALLOC_PTR(type->typ_dt_ops);
182 OBD_ALLOC_PTR(type->typ_md_ops);
183 OBD_ALLOC(type->typ_name, strlen(name) + 1);
185 if (type->typ_dt_ops == NULL ||
186 type->typ_md_ops == NULL ||
187 type->typ_name == NULL)
190 *(type->typ_dt_ops) = *dt_ops;
191 /* md_ops is optional */
193 *(type->typ_md_ops) = *md_ops;
194 strcpy(type->typ_name, name);
195 spin_lock_init(&type->obd_type_lock);
197 #ifdef CONFIG_PROC_FS
199 type->typ_procroot = lprocfs_register(type->typ_name,
202 if (IS_ERR(type->typ_procroot)) {
203 rc = PTR_ERR(type->typ_procroot);
204 type->typ_procroot = NULL;
211 rc = lu_device_type_init(ldt);
216 spin_lock(&obd_types_lock);
217 list_add(&type->typ_chain, &obd_types);
218 spin_unlock(&obd_types_lock);
223 if (type->typ_name != NULL) {
224 #ifdef CONFIG_PROC_FS
225 if (type->typ_procroot != NULL)
226 remove_proc_subtree(type->typ_name, proc_lustre_root);
228 OBD_FREE(type->typ_name, strlen(name) + 1);
230 if (type->typ_md_ops != NULL)
231 OBD_FREE_PTR(type->typ_md_ops);
232 if (type->typ_dt_ops != NULL)
233 OBD_FREE_PTR(type->typ_dt_ops);
234 OBD_FREE(type, sizeof(*type));
237 EXPORT_SYMBOL(class_register_type);
239 int class_unregister_type(const char *name)
241 struct obd_type *type = class_search_type(name);
245 CERROR("unknown obd type\n");
249 if (type->typ_refcnt) {
250 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
251 /* This is a bad situation, let's make the best of it */
252 /* Remove ops, but leave the name for debugging */
253 OBD_FREE_PTR(type->typ_dt_ops);
254 OBD_FREE_PTR(type->typ_md_ops);
258 /* we do not use type->typ_procroot as for compatibility purposes
259 * other modules can share names (i.e. lod can use lov entry). so
260 * we can't reference pointer as it can get invalided when another
261 * module removes the entry */
262 #ifdef CONFIG_PROC_FS
263 if (type->typ_procroot != NULL)
264 remove_proc_subtree(type->typ_name, proc_lustre_root);
265 if (type->typ_procsym != NULL)
266 lprocfs_remove(&type->typ_procsym);
269 lu_device_type_fini(type->typ_lu);
271 spin_lock(&obd_types_lock);
272 list_del(&type->typ_chain);
273 spin_unlock(&obd_types_lock);
274 OBD_FREE(type->typ_name, strlen(name) + 1);
275 if (type->typ_dt_ops != NULL)
276 OBD_FREE_PTR(type->typ_dt_ops);
277 if (type->typ_md_ops != NULL)
278 OBD_FREE_PTR(type->typ_md_ops);
279 OBD_FREE(type, sizeof(*type));
281 } /* class_unregister_type */
282 EXPORT_SYMBOL(class_unregister_type);
285 * Create a new obd device.
287 * Find an empty slot in ::obd_devs[], create a new obd device in it.
289 * \param[in] type_name obd device type string.
290 * \param[in] name obd device name.
292 * \retval NULL if create fails, otherwise return the obd device
295 struct obd_device *class_newdev(const char *type_name, const char *name)
297 struct obd_device *result = NULL;
298 struct obd_device *newdev;
299 struct obd_type *type = NULL;
301 int new_obd_minor = 0;
304 if (strlen(name) >= MAX_OBD_NAME) {
305 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
306 RETURN(ERR_PTR(-EINVAL));
309 type = class_get_type(type_name);
311 CERROR("OBD: unknown type: %s\n", type_name);
312 RETURN(ERR_PTR(-ENODEV));
315 newdev = obd_device_alloc();
317 GOTO(out_type, result = ERR_PTR(-ENOMEM));
319 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
321 write_lock(&obd_dev_lock);
322 for (i = 0; i < class_devno_max(); i++) {
323 struct obd_device *obd = class_num2obd(i);
325 if (obd && (strcmp(name, obd->obd_name) == 0)) {
326 CERROR("Device %s already exists at %d, won't add\n",
329 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
330 "%p obd_magic %08x != %08x\n", result,
331 result->obd_magic, OBD_DEVICE_MAGIC);
332 LASSERTF(result->obd_minor == new_obd_minor,
333 "%p obd_minor %d != %d\n", result,
334 result->obd_minor, new_obd_minor);
336 obd_devs[result->obd_minor] = NULL;
337 result->obd_name[0]='\0';
339 result = ERR_PTR(-EEXIST);
342 if (!result && !obd) {
344 result->obd_minor = i;
346 result->obd_type = type;
347 strncpy(result->obd_name, name,
348 sizeof(result->obd_name) - 1);
349 obd_devs[i] = result;
352 write_unlock(&obd_dev_lock);
354 if (result == NULL && i >= class_devno_max()) {
355 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
357 GOTO(out, result = ERR_PTR(-EOVERFLOW));
363 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
364 result->obd_name, result);
368 obd_device_free(newdev);
370 class_put_type(type);
374 void class_release_dev(struct obd_device *obd)
376 struct obd_type *obd_type = obd->obd_type;
378 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
379 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
380 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
381 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
382 LASSERT(obd_type != NULL);
384 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
385 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
387 write_lock(&obd_dev_lock);
388 obd_devs[obd->obd_minor] = NULL;
389 write_unlock(&obd_dev_lock);
390 obd_device_free(obd);
392 class_put_type(obd_type);
395 int class_name2dev(const char *name)
402 read_lock(&obd_dev_lock);
403 for (i = 0; i < class_devno_max(); i++) {
404 struct obd_device *obd = class_num2obd(i);
406 if (obd && strcmp(name, obd->obd_name) == 0) {
407 /* Make sure we finished attaching before we give
408 out any references */
409 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
410 if (obd->obd_attached) {
411 read_unlock(&obd_dev_lock);
417 read_unlock(&obd_dev_lock);
422 struct obd_device *class_name2obd(const char *name)
424 int dev = class_name2dev(name);
426 if (dev < 0 || dev > class_devno_max())
428 return class_num2obd(dev);
430 EXPORT_SYMBOL(class_name2obd);
432 int class_uuid2dev(struct obd_uuid *uuid)
436 read_lock(&obd_dev_lock);
437 for (i = 0; i < class_devno_max(); i++) {
438 struct obd_device *obd = class_num2obd(i);
440 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
441 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
442 read_unlock(&obd_dev_lock);
446 read_unlock(&obd_dev_lock);
451 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
453 int dev = class_uuid2dev(uuid);
456 return class_num2obd(dev);
458 EXPORT_SYMBOL(class_uuid2obd);
461 * Get obd device from ::obd_devs[]
463 * \param num [in] array index
465 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
466 * otherwise return the obd device there.
468 struct obd_device *class_num2obd(int num)
470 struct obd_device *obd = NULL;
472 if (num < class_devno_max()) {
477 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
478 "%p obd_magic %08x != %08x\n",
479 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
480 LASSERTF(obd->obd_minor == num,
481 "%p obd_minor %0d != %0d\n",
482 obd, obd->obd_minor, num);
489 * Get obd devices count. Device in any
491 * \retval obd device count
493 int get_devices_count(void)
495 int index, max_index = class_devno_max(), dev_count = 0;
497 read_lock(&obd_dev_lock);
498 for (index = 0; index <= max_index; index++) {
499 struct obd_device *obd = class_num2obd(index);
503 read_unlock(&obd_dev_lock);
507 EXPORT_SYMBOL(get_devices_count);
509 void class_obd_list(void)
514 read_lock(&obd_dev_lock);
515 for (i = 0; i < class_devno_max(); i++) {
516 struct obd_device *obd = class_num2obd(i);
520 if (obd->obd_stopping)
522 else if (obd->obd_set_up)
524 else if (obd->obd_attached)
528 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
529 i, status, obd->obd_type->typ_name,
530 obd->obd_name, obd->obd_uuid.uuid,
531 atomic_read(&obd->obd_refcount));
533 read_unlock(&obd_dev_lock);
537 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
538 specified, then only the client with that uuid is returned,
539 otherwise any client connected to the tgt is returned. */
540 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
541 const char * typ_name,
542 struct obd_uuid *grp_uuid)
546 read_lock(&obd_dev_lock);
547 for (i = 0; i < class_devno_max(); i++) {
548 struct obd_device *obd = class_num2obd(i);
552 if ((strncmp(obd->obd_type->typ_name, typ_name,
553 strlen(typ_name)) == 0)) {
554 if (obd_uuid_equals(tgt_uuid,
555 &obd->u.cli.cl_target_uuid) &&
556 ((grp_uuid)? obd_uuid_equals(grp_uuid,
557 &obd->obd_uuid) : 1)) {
558 read_unlock(&obd_dev_lock);
563 read_unlock(&obd_dev_lock);
567 EXPORT_SYMBOL(class_find_client_obd);
569 /* Iterate the obd_device list looking devices have grp_uuid. Start
570 searching at *next, and if a device is found, the next index to look
571 at is saved in *next. If next is NULL, then the first matching device
572 will always be returned. */
573 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
579 else if (*next >= 0 && *next < class_devno_max())
584 read_lock(&obd_dev_lock);
585 for (; i < class_devno_max(); i++) {
586 struct obd_device *obd = class_num2obd(i);
590 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
593 read_unlock(&obd_dev_lock);
597 read_unlock(&obd_dev_lock);
601 EXPORT_SYMBOL(class_devices_in_group);
604 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
605 * adjust sptlrpc settings accordingly.
607 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
609 struct obd_device *obd;
613 LASSERT(namelen > 0);
615 read_lock(&obd_dev_lock);
616 for (i = 0; i < class_devno_max(); i++) {
617 obd = class_num2obd(i);
619 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
622 /* only notify mdc, osc, mdt, ost */
623 type = obd->obd_type->typ_name;
624 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
625 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
626 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
627 strcmp(type, LUSTRE_OST_NAME) != 0)
630 if (strncmp(obd->obd_name, fsname, namelen))
633 class_incref(obd, __FUNCTION__, obd);
634 read_unlock(&obd_dev_lock);
635 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
636 sizeof(KEY_SPTLRPC_CONF),
637 KEY_SPTLRPC_CONF, 0, NULL, NULL);
639 class_decref(obd, __FUNCTION__, obd);
640 read_lock(&obd_dev_lock);
642 read_unlock(&obd_dev_lock);
645 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
647 void obd_cleanup_caches(void)
650 if (obd_device_cachep) {
651 kmem_cache_destroy(obd_device_cachep);
652 obd_device_cachep = NULL;
655 kmem_cache_destroy(obdo_cachep);
659 kmem_cache_destroy(import_cachep);
660 import_cachep = NULL;
663 kmem_cache_destroy(capa_cachep);
669 int obd_init_caches(void)
674 LASSERT(obd_device_cachep == NULL);
675 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
676 sizeof(struct obd_device),
678 if (!obd_device_cachep)
679 GOTO(out, rc = -ENOMEM);
681 LASSERT(obdo_cachep == NULL);
682 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
685 GOTO(out, rc = -ENOMEM);
687 LASSERT(import_cachep == NULL);
688 import_cachep = kmem_cache_create("ll_import_cache",
689 sizeof(struct obd_import),
692 GOTO(out, rc = -ENOMEM);
694 LASSERT(capa_cachep == NULL);
695 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
698 GOTO(out, rc = -ENOMEM);
702 obd_cleanup_caches();
706 /* map connection to client */
707 struct obd_export *class_conn2export(struct lustre_handle *conn)
709 struct obd_export *export;
713 CDEBUG(D_CACHE, "looking for null handle\n");
717 if (conn->cookie == -1) { /* this means assign a new connection */
718 CDEBUG(D_CACHE, "want a new connection\n");
722 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
723 export = class_handle2object(conn->cookie, NULL);
726 EXPORT_SYMBOL(class_conn2export);
728 struct obd_device *class_exp2obd(struct obd_export *exp)
734 EXPORT_SYMBOL(class_exp2obd);
736 struct obd_device *class_conn2obd(struct lustre_handle *conn)
738 struct obd_export *export;
739 export = class_conn2export(conn);
741 struct obd_device *obd = export->exp_obd;
742 class_export_put(export);
748 struct obd_import *class_exp2cliimp(struct obd_export *exp)
750 struct obd_device *obd = exp->exp_obd;
753 return obd->u.cli.cl_import;
755 EXPORT_SYMBOL(class_exp2cliimp);
757 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
759 struct obd_device *obd = class_conn2obd(conn);
762 return obd->u.cli.cl_import;
765 /* Export management functions */
766 static void class_export_destroy(struct obd_export *exp)
768 struct obd_device *obd = exp->exp_obd;
771 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
772 LASSERT(obd != NULL);
774 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
775 exp->exp_client_uuid.uuid, obd->obd_name);
777 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
778 if (exp->exp_connection)
779 ptlrpc_put_connection_superhack(exp->exp_connection);
781 LASSERT(list_empty(&exp->exp_outstanding_replies));
782 LASSERT(list_empty(&exp->exp_uncommitted_replies));
783 LASSERT(list_empty(&exp->exp_req_replay_queue));
784 LASSERT(list_empty(&exp->exp_hp_rpcs));
785 obd_destroy_export(exp);
786 class_decref(obd, "export", exp);
788 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
792 static void export_handle_addref(void *export)
794 class_export_get(export);
797 static struct portals_handle_ops export_handle_ops = {
798 .hop_addref = export_handle_addref,
802 struct obd_export *class_export_get(struct obd_export *exp)
804 atomic_inc(&exp->exp_refcount);
805 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
806 atomic_read(&exp->exp_refcount));
809 EXPORT_SYMBOL(class_export_get);
811 void class_export_put(struct obd_export *exp)
813 LASSERT(exp != NULL);
814 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
815 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
816 atomic_read(&exp->exp_refcount) - 1);
818 if (atomic_dec_and_test(&exp->exp_refcount)) {
819 LASSERT(!list_empty(&exp->exp_obd_chain));
820 CDEBUG(D_IOCTL, "final put %p/%s\n",
821 exp, exp->exp_client_uuid.uuid);
823 /* release nid stat refererence */
824 lprocfs_exp_cleanup(exp);
826 obd_zombie_export_add(exp);
829 EXPORT_SYMBOL(class_export_put);
831 /* Creates a new export, adds it to the hash table, and returns a
832 * pointer to it. The refcount is 2: one for the hash reference, and
833 * one for the pointer returned by this function. */
834 struct obd_export *class_new_export(struct obd_device *obd,
835 struct obd_uuid *cluuid)
837 struct obd_export *export;
838 cfs_hash_t *hash = NULL;
842 OBD_ALLOC_PTR(export);
844 return ERR_PTR(-ENOMEM);
846 export->exp_conn_cnt = 0;
847 export->exp_lock_hash = NULL;
848 export->exp_flock_hash = NULL;
849 atomic_set(&export->exp_refcount, 2);
850 atomic_set(&export->exp_rpc_count, 0);
851 atomic_set(&export->exp_cb_count, 0);
852 atomic_set(&export->exp_locks_count, 0);
853 #if LUSTRE_TRACKS_LOCK_EXP_REFS
854 INIT_LIST_HEAD(&export->exp_locks_list);
855 spin_lock_init(&export->exp_locks_list_guard);
857 atomic_set(&export->exp_replay_count, 0);
858 export->exp_obd = obd;
859 INIT_LIST_HEAD(&export->exp_outstanding_replies);
860 spin_lock_init(&export->exp_uncommitted_replies_lock);
861 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
862 INIT_LIST_HEAD(&export->exp_req_replay_queue);
863 INIT_LIST_HEAD(&export->exp_handle.h_link);
864 INIT_LIST_HEAD(&export->exp_hp_rpcs);
865 INIT_LIST_HEAD(&export->exp_reg_rpcs);
866 class_handle_hash(&export->exp_handle, &export_handle_ops);
867 export->exp_last_request_time = cfs_time_current_sec();
868 spin_lock_init(&export->exp_lock);
869 spin_lock_init(&export->exp_rpc_lock);
870 INIT_HLIST_NODE(&export->exp_uuid_hash);
871 INIT_HLIST_NODE(&export->exp_nid_hash);
872 spin_lock_init(&export->exp_bl_list_lock);
873 INIT_LIST_HEAD(&export->exp_bl_list);
875 export->exp_sp_peer = LUSTRE_SP_ANY;
876 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
877 export->exp_client_uuid = *cluuid;
878 obd_init_export(export);
880 spin_lock(&obd->obd_dev_lock);
881 /* shouldn't happen, but might race */
882 if (obd->obd_stopping)
883 GOTO(exit_unlock, rc = -ENODEV);
885 hash = cfs_hash_getref(obd->obd_uuid_hash);
887 GOTO(exit_unlock, rc = -ENODEV);
888 spin_unlock(&obd->obd_dev_lock);
890 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
891 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
893 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
894 obd->obd_name, cluuid->uuid, rc);
895 GOTO(exit_err, rc = -EALREADY);
899 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
900 spin_lock(&obd->obd_dev_lock);
901 if (obd->obd_stopping) {
902 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
903 GOTO(exit_unlock, rc = -ENODEV);
906 class_incref(obd, "export", export);
907 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
908 list_add_tail(&export->exp_obd_chain_timed,
909 &export->exp_obd->obd_exports_timed);
910 export->exp_obd->obd_num_exports++;
911 spin_unlock(&obd->obd_dev_lock);
912 cfs_hash_putref(hash);
916 spin_unlock(&obd->obd_dev_lock);
919 cfs_hash_putref(hash);
920 class_handle_unhash(&export->exp_handle);
921 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
922 obd_destroy_export(export);
923 OBD_FREE_PTR(export);
926 EXPORT_SYMBOL(class_new_export);
928 void class_unlink_export(struct obd_export *exp)
930 class_handle_unhash(&exp->exp_handle);
932 spin_lock(&exp->exp_obd->obd_dev_lock);
933 /* delete an uuid-export hashitem from hashtables */
934 if (!hlist_unhashed(&exp->exp_uuid_hash))
935 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
936 &exp->exp_client_uuid,
937 &exp->exp_uuid_hash);
939 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
940 list_del_init(&exp->exp_obd_chain_timed);
941 exp->exp_obd->obd_num_exports--;
942 spin_unlock(&exp->exp_obd->obd_dev_lock);
943 class_export_put(exp);
946 /* Import management functions */
947 static void class_import_destroy(struct obd_import *imp)
951 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
952 imp->imp_obd->obd_name);
954 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
956 ptlrpc_put_connection_superhack(imp->imp_connection);
958 while (!list_empty(&imp->imp_conn_list)) {
959 struct obd_import_conn *imp_conn;
961 imp_conn = list_entry(imp->imp_conn_list.next,
962 struct obd_import_conn, oic_item);
963 list_del_init(&imp_conn->oic_item);
964 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
965 OBD_FREE(imp_conn, sizeof(*imp_conn));
968 LASSERT(imp->imp_sec == NULL);
969 class_decref(imp->imp_obd, "import", imp);
970 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
974 static void import_handle_addref(void *import)
976 class_import_get(import);
979 static struct portals_handle_ops import_handle_ops = {
980 .hop_addref = import_handle_addref,
984 struct obd_import *class_import_get(struct obd_import *import)
986 atomic_inc(&import->imp_refcount);
987 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
988 atomic_read(&import->imp_refcount),
989 import->imp_obd->obd_name);
992 EXPORT_SYMBOL(class_import_get);
994 void class_import_put(struct obd_import *imp)
998 LASSERT(list_empty(&imp->imp_zombie_chain));
999 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1001 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1002 atomic_read(&imp->imp_refcount) - 1,
1003 imp->imp_obd->obd_name);
1005 if (atomic_dec_and_test(&imp->imp_refcount)) {
1006 CDEBUG(D_INFO, "final put import %p\n", imp);
1007 obd_zombie_import_add(imp);
1010 /* catch possible import put race */
1011 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1014 EXPORT_SYMBOL(class_import_put);
1016 static void init_imp_at(struct imp_at *at) {
1018 at_init(&at->iat_net_latency, 0, 0);
1019 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1020 /* max service estimates are tracked on the server side, so
1021 don't use the AT history here, just use the last reported
1022 val. (But keep hist for proc histogram, worst_ever) */
1023 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1028 struct obd_import *class_new_import(struct obd_device *obd)
1030 struct obd_import *imp;
1032 OBD_ALLOC(imp, sizeof(*imp));
1036 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1037 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1038 INIT_LIST_HEAD(&imp->imp_replay_list);
1039 INIT_LIST_HEAD(&imp->imp_sending_list);
1040 INIT_LIST_HEAD(&imp->imp_delayed_list);
1041 INIT_LIST_HEAD(&imp->imp_committed_list);
1042 imp->imp_replay_cursor = &imp->imp_committed_list;
1043 spin_lock_init(&imp->imp_lock);
1044 imp->imp_last_success_conn = 0;
1045 imp->imp_state = LUSTRE_IMP_NEW;
1046 imp->imp_obd = class_incref(obd, "import", imp);
1047 mutex_init(&imp->imp_sec_mutex);
1048 init_waitqueue_head(&imp->imp_recovery_waitq);
1050 atomic_set(&imp->imp_refcount, 2);
1051 atomic_set(&imp->imp_unregistering, 0);
1052 atomic_set(&imp->imp_inflight, 0);
1053 atomic_set(&imp->imp_replay_inflight, 0);
1054 atomic_set(&imp->imp_inval_count, 0);
1055 INIT_LIST_HEAD(&imp->imp_conn_list);
1056 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1057 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1058 init_imp_at(&imp->imp_at);
1060 /* the default magic is V2, will be used in connect RPC, and
1061 * then adjusted according to the flags in request/reply. */
1062 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1066 EXPORT_SYMBOL(class_new_import);
1068 void class_destroy_import(struct obd_import *import)
1070 LASSERT(import != NULL);
1071 LASSERT(import != LP_POISON);
1073 class_handle_unhash(&import->imp_handle);
1075 spin_lock(&import->imp_lock);
1076 import->imp_generation++;
1077 spin_unlock(&import->imp_lock);
1078 class_import_put(import);
1080 EXPORT_SYMBOL(class_destroy_import);
1082 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1084 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1086 spin_lock(&exp->exp_locks_list_guard);
1088 LASSERT(lock->l_exp_refs_nr >= 0);
1090 if (lock->l_exp_refs_target != NULL &&
1091 lock->l_exp_refs_target != exp) {
1092 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1093 exp, lock, lock->l_exp_refs_target);
1095 if ((lock->l_exp_refs_nr ++) == 0) {
1096 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1097 lock->l_exp_refs_target = exp;
1099 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1100 lock, exp, lock->l_exp_refs_nr);
1101 spin_unlock(&exp->exp_locks_list_guard);
1104 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1106 spin_lock(&exp->exp_locks_list_guard);
1107 LASSERT(lock->l_exp_refs_nr > 0);
1108 if (lock->l_exp_refs_target != exp) {
1109 LCONSOLE_WARN("lock %p, "
1110 "mismatching export pointers: %p, %p\n",
1111 lock, lock->l_exp_refs_target, exp);
1113 if (-- lock->l_exp_refs_nr == 0) {
1114 list_del_init(&lock->l_exp_refs_link);
1115 lock->l_exp_refs_target = NULL;
1117 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1118 lock, exp, lock->l_exp_refs_nr);
1119 spin_unlock(&exp->exp_locks_list_guard);
1123 /* A connection defines an export context in which preallocation can
1124 be managed. This releases the export pointer reference, and returns
1125 the export handle, so the export refcount is 1 when this function
1127 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1128 struct obd_uuid *cluuid)
1130 struct obd_export *export;
1131 LASSERT(conn != NULL);
1132 LASSERT(obd != NULL);
1133 LASSERT(cluuid != NULL);
1136 export = class_new_export(obd, cluuid);
1138 RETURN(PTR_ERR(export));
1140 conn->cookie = export->exp_handle.h_cookie;
1141 class_export_put(export);
1143 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1144 cluuid->uuid, conn->cookie);
1147 EXPORT_SYMBOL(class_connect);
1149 /* if export is involved in recovery then clean up related things */
1150 static void class_export_recovery_cleanup(struct obd_export *exp)
1152 struct obd_device *obd = exp->exp_obd;
1154 spin_lock(&obd->obd_recovery_task_lock);
1155 if (obd->obd_recovering) {
1156 if (exp->exp_in_recovery) {
1157 spin_lock(&exp->exp_lock);
1158 exp->exp_in_recovery = 0;
1159 spin_unlock(&exp->exp_lock);
1160 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1161 atomic_dec(&obd->obd_connected_clients);
1164 /* if called during recovery then should update
1165 * obd_stale_clients counter,
1166 * lightweight exports are not counted */
1167 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1168 exp->exp_obd->obd_stale_clients++;
1170 spin_unlock(&obd->obd_recovery_task_lock);
1172 spin_lock(&exp->exp_lock);
1173 /** Cleanup req replay fields */
1174 if (exp->exp_req_replay_needed) {
1175 exp->exp_req_replay_needed = 0;
1177 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1178 atomic_dec(&obd->obd_req_replay_clients);
1181 /** Cleanup lock replay data */
1182 if (exp->exp_lock_replay_needed) {
1183 exp->exp_lock_replay_needed = 0;
1185 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1186 atomic_dec(&obd->obd_lock_replay_clients);
1188 spin_unlock(&exp->exp_lock);
1191 /* This function removes 1-3 references from the export:
1192 * 1 - for export pointer passed
1193 * and if disconnect really need
1194 * 2 - removing from hash
1195 * 3 - in client_unlink_export
1196 * The export pointer passed to this function can destroyed */
1197 int class_disconnect(struct obd_export *export)
1199 int already_disconnected;
1202 if (export == NULL) {
1203 CWARN("attempting to free NULL export %p\n", export);
1207 spin_lock(&export->exp_lock);
1208 already_disconnected = export->exp_disconnected;
1209 export->exp_disconnected = 1;
1210 spin_unlock(&export->exp_lock);
1212 /* class_cleanup(), abort_recovery(), and class_fail_export()
1213 * all end up in here, and if any of them race we shouldn't
1214 * call extra class_export_puts(). */
1215 if (already_disconnected) {
1216 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1217 GOTO(no_disconn, already_disconnected);
1220 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1221 export->exp_handle.h_cookie);
1223 if (!hlist_unhashed(&export->exp_nid_hash))
1224 cfs_hash_del(export->exp_obd->obd_nid_hash,
1225 &export->exp_connection->c_peer.nid,
1226 &export->exp_nid_hash);
1228 class_export_recovery_cleanup(export);
1229 class_unlink_export(export);
1231 class_export_put(export);
1234 EXPORT_SYMBOL(class_disconnect);
1236 /* Return non-zero for a fully connected export */
1237 int class_connected_export(struct obd_export *exp)
1242 spin_lock(&exp->exp_lock);
1243 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1244 spin_unlock(&exp->exp_lock);
1248 EXPORT_SYMBOL(class_connected_export);
1250 static void class_disconnect_export_list(struct list_head *list,
1251 enum obd_option flags)
1254 struct obd_export *exp;
1257 /* It's possible that an export may disconnect itself, but
1258 * nothing else will be added to this list. */
1259 while (!list_empty(list)) {
1260 exp = list_entry(list->next, struct obd_export,
1262 /* need for safe call CDEBUG after obd_disconnect */
1263 class_export_get(exp);
1265 spin_lock(&exp->exp_lock);
1266 exp->exp_flags = flags;
1267 spin_unlock(&exp->exp_lock);
1269 if (obd_uuid_equals(&exp->exp_client_uuid,
1270 &exp->exp_obd->obd_uuid)) {
1272 "exp %p export uuid == obd uuid, don't discon\n",
1274 /* Need to delete this now so we don't end up pointing
1275 * to work_list later when this export is cleaned up. */
1276 list_del_init(&exp->exp_obd_chain);
1277 class_export_put(exp);
1281 class_export_get(exp);
1282 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1283 "last request at "CFS_TIME_T"\n",
1284 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1285 exp, exp->exp_last_request_time);
1286 /* release one export reference anyway */
1287 rc = obd_disconnect(exp);
1289 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1290 obd_export_nid2str(exp), exp, rc);
1291 class_export_put(exp);
1296 void class_disconnect_exports(struct obd_device *obd)
1298 struct list_head work_list;
1301 /* Move all of the exports from obd_exports to a work list, en masse. */
1302 INIT_LIST_HEAD(&work_list);
1303 spin_lock(&obd->obd_dev_lock);
1304 list_splice_init(&obd->obd_exports, &work_list);
1305 list_splice_init(&obd->obd_delayed_exports, &work_list);
1306 spin_unlock(&obd->obd_dev_lock);
1308 if (!list_empty(&work_list)) {
1309 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1310 "disconnecting them\n", obd->obd_minor, obd);
1311 class_disconnect_export_list(&work_list,
1312 exp_flags_from_obd(obd));
1314 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1315 obd->obd_minor, obd);
1318 EXPORT_SYMBOL(class_disconnect_exports);
1320 /* Remove exports that have not completed recovery.
1322 void class_disconnect_stale_exports(struct obd_device *obd,
1323 int (*test_export)(struct obd_export *))
1325 struct list_head work_list;
1326 struct obd_export *exp, *n;
1330 INIT_LIST_HEAD(&work_list);
1331 spin_lock(&obd->obd_dev_lock);
1332 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1334 /* don't count self-export as client */
1335 if (obd_uuid_equals(&exp->exp_client_uuid,
1336 &exp->exp_obd->obd_uuid))
1339 /* don't evict clients which have no slot in last_rcvd
1340 * (e.g. lightweight connection) */
1341 if (exp->exp_target_data.ted_lr_idx == -1)
1344 spin_lock(&exp->exp_lock);
1345 if (exp->exp_failed || test_export(exp)) {
1346 spin_unlock(&exp->exp_lock);
1349 exp->exp_failed = 1;
1350 spin_unlock(&exp->exp_lock);
1352 list_move(&exp->exp_obd_chain, &work_list);
1354 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1355 obd->obd_name, exp->exp_client_uuid.uuid,
1356 exp->exp_connection == NULL ? "<unknown>" :
1357 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1358 print_export_data(exp, "EVICTING", 0);
1360 spin_unlock(&obd->obd_dev_lock);
1363 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1364 obd->obd_name, evicted);
1366 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1367 OBD_OPT_ABORT_RECOV);
1370 EXPORT_SYMBOL(class_disconnect_stale_exports);
1372 void class_fail_export(struct obd_export *exp)
1374 int rc, already_failed;
1376 spin_lock(&exp->exp_lock);
1377 already_failed = exp->exp_failed;
1378 exp->exp_failed = 1;
1379 spin_unlock(&exp->exp_lock);
1381 if (already_failed) {
1382 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1383 exp, exp->exp_client_uuid.uuid);
1387 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1388 exp, exp->exp_client_uuid.uuid);
1390 if (obd_dump_on_timeout)
1391 libcfs_debug_dumplog();
1393 /* need for safe call CDEBUG after obd_disconnect */
1394 class_export_get(exp);
1396 /* Most callers into obd_disconnect are removing their own reference
1397 * (request, for example) in addition to the one from the hash table.
1398 * We don't have such a reference here, so make one. */
1399 class_export_get(exp);
1400 rc = obd_disconnect(exp);
1402 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1404 CDEBUG(D_HA, "disconnected export %p/%s\n",
1405 exp, exp->exp_client_uuid.uuid);
1406 class_export_put(exp);
1408 EXPORT_SYMBOL(class_fail_export);
1410 char *obd_export_nid2str(struct obd_export *exp)
1412 if (exp->exp_connection != NULL)
1413 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1417 EXPORT_SYMBOL(obd_export_nid2str);
1419 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1421 cfs_hash_t *nid_hash;
1422 struct obd_export *doomed_exp = NULL;
1423 int exports_evicted = 0;
1425 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1427 spin_lock(&obd->obd_dev_lock);
1428 /* umount has run already, so evict thread should leave
1429 * its task to umount thread now */
1430 if (obd->obd_stopping) {
1431 spin_unlock(&obd->obd_dev_lock);
1432 return exports_evicted;
1434 nid_hash = obd->obd_nid_hash;
1435 cfs_hash_getref(nid_hash);
1436 spin_unlock(&obd->obd_dev_lock);
1439 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1440 if (doomed_exp == NULL)
1443 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1444 "nid %s found, wanted nid %s, requested nid %s\n",
1445 obd_export_nid2str(doomed_exp),
1446 libcfs_nid2str(nid_key), nid);
1447 LASSERTF(doomed_exp != obd->obd_self_export,
1448 "self-export is hashed by NID?\n");
1450 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1451 "request\n", obd->obd_name,
1452 obd_uuid2str(&doomed_exp->exp_client_uuid),
1453 obd_export_nid2str(doomed_exp));
1454 class_fail_export(doomed_exp);
1455 class_export_put(doomed_exp);
1458 cfs_hash_putref(nid_hash);
1460 if (!exports_evicted)
1461 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1462 obd->obd_name, nid);
1463 return exports_evicted;
1465 EXPORT_SYMBOL(obd_export_evict_by_nid);
1467 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1469 cfs_hash_t *uuid_hash;
1470 struct obd_export *doomed_exp = NULL;
1471 struct obd_uuid doomed_uuid;
1472 int exports_evicted = 0;
1474 spin_lock(&obd->obd_dev_lock);
1475 if (obd->obd_stopping) {
1476 spin_unlock(&obd->obd_dev_lock);
1477 return exports_evicted;
1479 uuid_hash = obd->obd_uuid_hash;
1480 cfs_hash_getref(uuid_hash);
1481 spin_unlock(&obd->obd_dev_lock);
1483 obd_str2uuid(&doomed_uuid, uuid);
1484 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1485 CERROR("%s: can't evict myself\n", obd->obd_name);
1486 cfs_hash_putref(uuid_hash);
1487 return exports_evicted;
1490 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1492 if (doomed_exp == NULL) {
1493 CERROR("%s: can't disconnect %s: no exports found\n",
1494 obd->obd_name, uuid);
1496 CWARN("%s: evicting %s at adminstrative request\n",
1497 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1498 class_fail_export(doomed_exp);
1499 class_export_put(doomed_exp);
1502 cfs_hash_putref(uuid_hash);
1504 return exports_evicted;
1507 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1508 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1511 static void print_export_data(struct obd_export *exp, const char *status,
1514 struct ptlrpc_reply_state *rs;
1515 struct ptlrpc_reply_state *first_reply = NULL;
1518 spin_lock(&exp->exp_lock);
1519 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1525 spin_unlock(&exp->exp_lock);
1527 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1528 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1529 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1530 atomic_read(&exp->exp_rpc_count),
1531 atomic_read(&exp->exp_cb_count),
1532 atomic_read(&exp->exp_locks_count),
1533 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1534 nreplies, first_reply, nreplies > 3 ? "..." : "",
1535 exp->exp_last_committed);
1536 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1537 if (locks && class_export_dump_hook != NULL)
1538 class_export_dump_hook(exp);
1542 void dump_exports(struct obd_device *obd, int locks)
1544 struct obd_export *exp;
1546 spin_lock(&obd->obd_dev_lock);
1547 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1548 print_export_data(exp, "ACTIVE", locks);
1549 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1550 print_export_data(exp, "UNLINKED", locks);
1551 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1552 print_export_data(exp, "DELAYED", locks);
1553 spin_unlock(&obd->obd_dev_lock);
1554 spin_lock(&obd_zombie_impexp_lock);
1555 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1556 print_export_data(exp, "ZOMBIE", locks);
1557 spin_unlock(&obd_zombie_impexp_lock);
1560 void obd_exports_barrier(struct obd_device *obd)
1563 LASSERT(list_empty(&obd->obd_exports));
1564 spin_lock(&obd->obd_dev_lock);
1565 while (!list_empty(&obd->obd_unlinked_exports)) {
1566 spin_unlock(&obd->obd_dev_lock);
1567 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1568 cfs_time_seconds(waited));
1569 if (waited > 5 && IS_PO2(waited)) {
1570 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1571 "more than %d seconds. "
1572 "The obd refcount = %d. Is it stuck?\n",
1573 obd->obd_name, waited,
1574 atomic_read(&obd->obd_refcount));
1575 dump_exports(obd, 1);
1578 spin_lock(&obd->obd_dev_lock);
1580 spin_unlock(&obd->obd_dev_lock);
1582 EXPORT_SYMBOL(obd_exports_barrier);
1584 /* Total amount of zombies to be destroyed */
1585 static int zombies_count = 0;
1588 * kill zombie imports and exports
1590 void obd_zombie_impexp_cull(void)
1592 struct obd_import *import;
1593 struct obd_export *export;
1597 spin_lock(&obd_zombie_impexp_lock);
1600 if (!list_empty(&obd_zombie_imports)) {
1601 import = list_entry(obd_zombie_imports.next,
1604 list_del_init(&import->imp_zombie_chain);
1608 if (!list_empty(&obd_zombie_exports)) {
1609 export = list_entry(obd_zombie_exports.next,
1612 list_del_init(&export->exp_obd_chain);
1615 spin_unlock(&obd_zombie_impexp_lock);
1617 if (import != NULL) {
1618 class_import_destroy(import);
1619 spin_lock(&obd_zombie_impexp_lock);
1621 spin_unlock(&obd_zombie_impexp_lock);
1624 if (export != NULL) {
1625 class_export_destroy(export);
1626 spin_lock(&obd_zombie_impexp_lock);
1628 spin_unlock(&obd_zombie_impexp_lock);
1632 } while (import != NULL || export != NULL);
1636 static struct completion obd_zombie_start;
1637 static struct completion obd_zombie_stop;
1638 static unsigned long obd_zombie_flags;
1639 static wait_queue_head_t obd_zombie_waitq;
1640 static pid_t obd_zombie_pid;
1643 OBD_ZOMBIE_STOP = 0x0001,
1647 * check for work for kill zombie import/export thread.
1649 static int obd_zombie_impexp_check(void *arg)
1653 spin_lock(&obd_zombie_impexp_lock);
1654 rc = (zombies_count == 0) &&
1655 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1656 spin_unlock(&obd_zombie_impexp_lock);
1662 * Add export to the obd_zombe thread and notify it.
1664 static void obd_zombie_export_add(struct obd_export *exp) {
1665 spin_lock(&exp->exp_obd->obd_dev_lock);
1666 LASSERT(!list_empty(&exp->exp_obd_chain));
1667 list_del_init(&exp->exp_obd_chain);
1668 spin_unlock(&exp->exp_obd->obd_dev_lock);
1669 spin_lock(&obd_zombie_impexp_lock);
1671 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1672 spin_unlock(&obd_zombie_impexp_lock);
1674 obd_zombie_impexp_notify();
1678 * Add import to the obd_zombe thread and notify it.
1680 static void obd_zombie_import_add(struct obd_import *imp) {
1681 LASSERT(imp->imp_sec == NULL);
1682 LASSERT(imp->imp_rq_pool == NULL);
1683 spin_lock(&obd_zombie_impexp_lock);
1684 LASSERT(list_empty(&imp->imp_zombie_chain));
1686 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1687 spin_unlock(&obd_zombie_impexp_lock);
1689 obd_zombie_impexp_notify();
1693 * notify import/export destroy thread about new zombie.
1695 static void obd_zombie_impexp_notify(void)
1698 * Make sure obd_zomebie_impexp_thread get this notification.
1699 * It is possible this signal only get by obd_zombie_barrier, and
1700 * barrier gulps this notification and sleeps away and hangs ensues
1702 wake_up_all(&obd_zombie_waitq);
1706 * check whether obd_zombie is idle
1708 static int obd_zombie_is_idle(void)
1712 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1713 spin_lock(&obd_zombie_impexp_lock);
1714 rc = (zombies_count == 0);
1715 spin_unlock(&obd_zombie_impexp_lock);
1720 * wait when obd_zombie import/export queues become empty
1722 void obd_zombie_barrier(void)
1724 struct l_wait_info lwi = { 0 };
1726 if (obd_zombie_pid == current_pid())
1727 /* don't wait for myself */
1729 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1731 EXPORT_SYMBOL(obd_zombie_barrier);
1735 * destroy zombie export/import thread.
1737 static int obd_zombie_impexp_thread(void *unused)
1739 unshare_fs_struct();
1740 complete(&obd_zombie_start);
1742 obd_zombie_pid = current_pid();
1744 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1745 struct l_wait_info lwi = { 0 };
1747 l_wait_event(obd_zombie_waitq,
1748 !obd_zombie_impexp_check(NULL), &lwi);
1749 obd_zombie_impexp_cull();
1752 * Notify obd_zombie_barrier callers that queues
1755 wake_up(&obd_zombie_waitq);
1758 complete(&obd_zombie_stop);
1765 * start destroy zombie import/export thread
1767 int obd_zombie_impexp_init(void)
1769 struct task_struct *task;
1771 INIT_LIST_HEAD(&obd_zombie_imports);
1773 INIT_LIST_HEAD(&obd_zombie_exports);
1774 spin_lock_init(&obd_zombie_impexp_lock);
1775 init_completion(&obd_zombie_start);
1776 init_completion(&obd_zombie_stop);
1777 init_waitqueue_head(&obd_zombie_waitq);
1780 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1782 RETURN(PTR_ERR(task));
1784 wait_for_completion(&obd_zombie_start);
1788 * stop destroy zombie import/export thread
1790 void obd_zombie_impexp_stop(void)
1792 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1793 obd_zombie_impexp_notify();
1794 wait_for_completion(&obd_zombie_stop);
1797 /***** Kernel-userspace comm helpers *******/
1799 /* Get length of entire message, including header */
1800 int kuc_len(int payload_len)
1802 return sizeof(struct kuc_hdr) + payload_len;
1804 EXPORT_SYMBOL(kuc_len);
1806 /* Get a pointer to kuc header, given a ptr to the payload
1807 * @param p Pointer to payload area
1808 * @returns Pointer to kuc header
1810 struct kuc_hdr * kuc_ptr(void *p)
1812 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1813 LASSERT(lh->kuc_magic == KUC_MAGIC);
1816 EXPORT_SYMBOL(kuc_ptr);
1818 /* Test if payload is part of kuc message
1819 * @param p Pointer to payload area
1822 int kuc_ispayload(void *p)
1824 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1826 if (kh->kuc_magic == KUC_MAGIC)
1831 EXPORT_SYMBOL(kuc_ispayload);
1833 /* Alloc space for a message, and fill in header
1834 * @return Pointer to payload area
1836 void *kuc_alloc(int payload_len, int transport, int type)
1839 int len = kuc_len(payload_len);
1843 return ERR_PTR(-ENOMEM);
1845 lh->kuc_magic = KUC_MAGIC;
1846 lh->kuc_transport = transport;
1847 lh->kuc_msgtype = type;
1848 lh->kuc_msglen = len;
1850 return (void *)(lh + 1);
1852 EXPORT_SYMBOL(kuc_alloc);
1854 /* Takes pointer to payload area */
1855 inline void kuc_free(void *p, int payload_len)
1857 struct kuc_hdr *lh = kuc_ptr(p);
1858 OBD_FREE(lh, kuc_len(payload_len));
1860 EXPORT_SYMBOL(kuc_free);
1862 struct obd_request_slot_waiter {
1863 struct list_head orsw_entry;
1864 wait_queue_head_t orsw_waitq;
1868 static bool obd_request_slot_avail(struct client_obd *cli,
1869 struct obd_request_slot_waiter *orsw)
1873 spin_lock(&cli->cl_loi_list_lock);
1874 avail = !!list_empty(&orsw->orsw_entry);
1875 spin_unlock(&cli->cl_loi_list_lock);
1881 * For network flow control, the RPC sponsor needs to acquire a credit
1882 * before sending the RPC. The credits count for a connection is defined
1883 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1884 * the subsequent RPC sponsors need to wait until others released their
1885 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1887 int obd_get_request_slot(struct client_obd *cli)
1889 struct obd_request_slot_waiter orsw;
1890 struct l_wait_info lwi;
1893 spin_lock(&cli->cl_loi_list_lock);
1894 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1895 cli->cl_r_in_flight++;
1896 spin_unlock(&cli->cl_loi_list_lock);
1900 init_waitqueue_head(&orsw.orsw_waitq);
1901 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1902 orsw.orsw_signaled = false;
1903 spin_unlock(&cli->cl_loi_list_lock);
1905 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1906 rc = l_wait_event(orsw.orsw_waitq,
1907 obd_request_slot_avail(cli, &orsw) ||
1911 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1912 * freed but other (such as obd_put_request_slot) is using it. */
1913 spin_lock(&cli->cl_loi_list_lock);
1915 if (!orsw.orsw_signaled) {
1916 if (list_empty(&orsw.orsw_entry))
1917 cli->cl_r_in_flight--;
1919 list_del(&orsw.orsw_entry);
1923 if (orsw.orsw_signaled) {
1924 LASSERT(list_empty(&orsw.orsw_entry));
1928 spin_unlock(&cli->cl_loi_list_lock);
1932 EXPORT_SYMBOL(obd_get_request_slot);
1934 void obd_put_request_slot(struct client_obd *cli)
1936 struct obd_request_slot_waiter *orsw;
1938 spin_lock(&cli->cl_loi_list_lock);
1939 cli->cl_r_in_flight--;
1941 /* If there is free slot, wakeup the first waiter. */
1942 if (!list_empty(&cli->cl_loi_read_list) &&
1943 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1944 orsw = list_entry(cli->cl_loi_read_list.next,
1945 struct obd_request_slot_waiter, orsw_entry);
1946 list_del_init(&orsw->orsw_entry);
1947 cli->cl_r_in_flight++;
1948 wake_up(&orsw->orsw_waitq);
1950 spin_unlock(&cli->cl_loi_list_lock);
1952 EXPORT_SYMBOL(obd_put_request_slot);
1954 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1956 return cli->cl_max_rpcs_in_flight;
1958 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1960 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1962 struct obd_request_slot_waiter *orsw;
1967 if (max > OBD_MAX_RIF_MAX || max < 1)
1970 spin_lock(&cli->cl_loi_list_lock);
1971 old = cli->cl_max_rpcs_in_flight;
1972 cli->cl_max_rpcs_in_flight = max;
1975 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1976 for (i = 0; i < diff; i++) {
1977 if (list_empty(&cli->cl_loi_read_list))
1980 orsw = list_entry(cli->cl_loi_read_list.next,
1981 struct obd_request_slot_waiter, orsw_entry);
1982 list_del_init(&orsw->orsw_entry);
1983 cli->cl_r_in_flight++;
1984 wake_up(&orsw->orsw_waitq);
1986 spin_unlock(&cli->cl_loi_list_lock);
1990 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);