4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_class.h>
44 #include <lprocfs_status.h>
46 spinlock_t obd_types_lock;
48 static struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
53 static struct list_head obd_zombie_imports;
54 static struct list_head obd_zombie_exports;
55 static spinlock_t obd_zombie_impexp_lock;
57 static void obd_zombie_impexp_notify(void);
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61 const char *status, int locks);
63 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
64 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67 * support functions: we could use inter-module communication, but this
68 * is more portable to other OS's
70 static struct obd_device *obd_device_alloc(void)
72 struct obd_device *obd;
74 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
76 obd->obd_magic = OBD_DEVICE_MAGIC;
81 static void obd_device_free(struct obd_device *obd)
84 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
85 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
86 if (obd->obd_namespace != NULL) {
87 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
88 obd, obd->obd_namespace, obd->obd_force);
91 lu_ref_fini(&obd->obd_reference);
92 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 struct obd_type *class_search_type(const char *name)
97 struct list_head *tmp;
98 struct obd_type *type;
100 spin_lock(&obd_types_lock);
101 list_for_each(tmp, &obd_types) {
102 type = list_entry(tmp, struct obd_type, typ_chain);
103 if (strcmp(type->typ_name, name) == 0) {
104 spin_unlock(&obd_types_lock);
108 spin_unlock(&obd_types_lock);
111 EXPORT_SYMBOL(class_search_type);
113 struct obd_type *class_get_type(const char *name)
115 struct obd_type *type = class_search_type(name);
117 #ifdef HAVE_MODULE_LOADING_SUPPORT
119 const char *modname = name;
121 if (strcmp(modname, "obdfilter") == 0)
124 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
125 modname = LUSTRE_OSP_NAME;
127 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
128 modname = LUSTRE_MDT_NAME;
130 if (!request_module("%s", modname)) {
131 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
132 type = class_search_type(name);
134 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
140 spin_lock(&type->obd_type_lock);
142 try_module_get(type->typ_dt_ops->o_owner);
143 spin_unlock(&type->obd_type_lock);
147 EXPORT_SYMBOL(class_get_type);
149 void class_put_type(struct obd_type *type)
152 spin_lock(&type->obd_type_lock);
154 module_put(type->typ_dt_ops->o_owner);
155 spin_unlock(&type->obd_type_lock);
157 EXPORT_SYMBOL(class_put_type);
159 #define CLASS_MAX_NAME 1024
161 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
162 bool enable_proc, struct lprocfs_seq_vars *vars,
163 const char *name, struct lu_device_type *ldt)
165 struct obd_type *type;
170 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
172 if (class_search_type(name)) {
173 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
178 OBD_ALLOC(type, sizeof(*type));
182 OBD_ALLOC_PTR(type->typ_dt_ops);
183 OBD_ALLOC_PTR(type->typ_md_ops);
184 OBD_ALLOC(type->typ_name, strlen(name) + 1);
186 if (type->typ_dt_ops == NULL ||
187 type->typ_md_ops == NULL ||
188 type->typ_name == NULL)
191 *(type->typ_dt_ops) = *dt_ops;
192 /* md_ops is optional */
194 *(type->typ_md_ops) = *md_ops;
195 strcpy(type->typ_name, name);
196 spin_lock_init(&type->obd_type_lock);
200 type->typ_procroot = lprocfs_seq_register(type->typ_name,
203 if (IS_ERR(type->typ_procroot)) {
204 rc = PTR_ERR(type->typ_procroot);
205 type->typ_procroot = NULL;
212 rc = lu_device_type_init(ldt);
217 spin_lock(&obd_types_lock);
218 list_add(&type->typ_chain, &obd_types);
219 spin_unlock(&obd_types_lock);
224 if (type->typ_name != NULL) {
226 if (type->typ_procroot != NULL) {
227 #ifndef HAVE_ONLY_PROCFS_SEQ
228 lprocfs_try_remove_proc_entry(type->typ_name,
231 remove_proc_subtree(type->typ_name, proc_lustre_root);
235 OBD_FREE(type->typ_name, strlen(name) + 1);
237 if (type->typ_md_ops != NULL)
238 OBD_FREE_PTR(type->typ_md_ops);
239 if (type->typ_dt_ops != NULL)
240 OBD_FREE_PTR(type->typ_dt_ops);
241 OBD_FREE(type, sizeof(*type));
244 EXPORT_SYMBOL(class_register_type);
246 int class_unregister_type(const char *name)
248 struct obd_type *type = class_search_type(name);
252 CERROR("unknown obd type\n");
256 if (type->typ_refcnt) {
257 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
258 /* This is a bad situation, let's make the best of it */
259 /* Remove ops, but leave the name for debugging */
260 OBD_FREE_PTR(type->typ_dt_ops);
261 OBD_FREE_PTR(type->typ_md_ops);
265 /* we do not use type->typ_procroot as for compatibility purposes
266 * other modules can share names (i.e. lod can use lov entry). so
267 * we can't reference pointer as it can get invalided when another
268 * module removes the entry */
270 if (type->typ_procroot != NULL) {
271 #ifndef HAVE_ONLY_PROCFS_SEQ
272 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
274 remove_proc_subtree(type->typ_name, proc_lustre_root);
278 if (type->typ_procsym != NULL)
279 lprocfs_remove(&type->typ_procsym);
282 lu_device_type_fini(type->typ_lu);
284 spin_lock(&obd_types_lock);
285 list_del(&type->typ_chain);
286 spin_unlock(&obd_types_lock);
287 OBD_FREE(type->typ_name, strlen(name) + 1);
288 if (type->typ_dt_ops != NULL)
289 OBD_FREE_PTR(type->typ_dt_ops);
290 if (type->typ_md_ops != NULL)
291 OBD_FREE_PTR(type->typ_md_ops);
292 OBD_FREE(type, sizeof(*type));
294 } /* class_unregister_type */
295 EXPORT_SYMBOL(class_unregister_type);
298 * Create a new obd device.
300 * Find an empty slot in ::obd_devs[], create a new obd device in it.
302 * \param[in] type_name obd device type string.
303 * \param[in] name obd device name.
305 * \retval NULL if create fails, otherwise return the obd device
308 struct obd_device *class_newdev(const char *type_name, const char *name)
310 struct obd_device *result = NULL;
311 struct obd_device *newdev;
312 struct obd_type *type = NULL;
314 int new_obd_minor = 0;
317 if (strlen(name) >= MAX_OBD_NAME) {
318 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
319 RETURN(ERR_PTR(-EINVAL));
322 type = class_get_type(type_name);
324 CERROR("OBD: unknown type: %s\n", type_name);
325 RETURN(ERR_PTR(-ENODEV));
328 newdev = obd_device_alloc();
330 GOTO(out_type, result = ERR_PTR(-ENOMEM));
332 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
334 write_lock(&obd_dev_lock);
335 for (i = 0; i < class_devno_max(); i++) {
336 struct obd_device *obd = class_num2obd(i);
338 if (obd && (strcmp(name, obd->obd_name) == 0)) {
339 CERROR("Device %s already exists at %d, won't add\n",
342 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
343 "%p obd_magic %08x != %08x\n", result,
344 result->obd_magic, OBD_DEVICE_MAGIC);
345 LASSERTF(result->obd_minor == new_obd_minor,
346 "%p obd_minor %d != %d\n", result,
347 result->obd_minor, new_obd_minor);
349 obd_devs[result->obd_minor] = NULL;
350 result->obd_name[0]='\0';
352 result = ERR_PTR(-EEXIST);
355 if (!result && !obd) {
357 result->obd_minor = i;
359 result->obd_type = type;
360 strncpy(result->obd_name, name,
361 sizeof(result->obd_name) - 1);
362 obd_devs[i] = result;
365 write_unlock(&obd_dev_lock);
367 if (result == NULL && i >= class_devno_max()) {
368 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
370 GOTO(out, result = ERR_PTR(-EOVERFLOW));
376 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
377 result->obd_name, result);
381 obd_device_free(newdev);
383 class_put_type(type);
387 void class_release_dev(struct obd_device *obd)
389 struct obd_type *obd_type = obd->obd_type;
391 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
392 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
393 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
394 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
395 LASSERT(obd_type != NULL);
397 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
398 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
400 write_lock(&obd_dev_lock);
401 obd_devs[obd->obd_minor] = NULL;
402 write_unlock(&obd_dev_lock);
403 obd_device_free(obd);
405 class_put_type(obd_type);
408 int class_name2dev(const char *name)
415 read_lock(&obd_dev_lock);
416 for (i = 0; i < class_devno_max(); i++) {
417 struct obd_device *obd = class_num2obd(i);
419 if (obd && strcmp(name, obd->obd_name) == 0) {
420 /* Make sure we finished attaching before we give
421 out any references */
422 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
423 if (obd->obd_attached) {
424 read_unlock(&obd_dev_lock);
430 read_unlock(&obd_dev_lock);
434 EXPORT_SYMBOL(class_name2dev);
436 struct obd_device *class_name2obd(const char *name)
438 int dev = class_name2dev(name);
440 if (dev < 0 || dev > class_devno_max())
442 return class_num2obd(dev);
444 EXPORT_SYMBOL(class_name2obd);
446 int class_uuid2dev(struct obd_uuid *uuid)
450 read_lock(&obd_dev_lock);
451 for (i = 0; i < class_devno_max(); i++) {
452 struct obd_device *obd = class_num2obd(i);
454 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
455 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
456 read_unlock(&obd_dev_lock);
460 read_unlock(&obd_dev_lock);
464 EXPORT_SYMBOL(class_uuid2dev);
466 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
468 int dev = class_uuid2dev(uuid);
471 return class_num2obd(dev);
473 EXPORT_SYMBOL(class_uuid2obd);
476 * Get obd device from ::obd_devs[]
478 * \param num [in] array index
480 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
481 * otherwise return the obd device there.
483 struct obd_device *class_num2obd(int num)
485 struct obd_device *obd = NULL;
487 if (num < class_devno_max()) {
492 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
493 "%p obd_magic %08x != %08x\n",
494 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
495 LASSERTF(obd->obd_minor == num,
496 "%p obd_minor %0d != %0d\n",
497 obd, obd->obd_minor, num);
502 EXPORT_SYMBOL(class_num2obd);
505 * Get obd devices count. Device in any
507 * \retval obd device count
509 int get_devices_count(void)
511 int index, max_index = class_devno_max(), dev_count = 0;
513 read_lock(&obd_dev_lock);
514 for (index = 0; index <= max_index; index++) {
515 struct obd_device *obd = class_num2obd(index);
519 read_unlock(&obd_dev_lock);
523 EXPORT_SYMBOL(get_devices_count);
525 void class_obd_list(void)
530 read_lock(&obd_dev_lock);
531 for (i = 0; i < class_devno_max(); i++) {
532 struct obd_device *obd = class_num2obd(i);
536 if (obd->obd_stopping)
538 else if (obd->obd_set_up)
540 else if (obd->obd_attached)
544 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
545 i, status, obd->obd_type->typ_name,
546 obd->obd_name, obd->obd_uuid.uuid,
547 atomic_read(&obd->obd_refcount));
549 read_unlock(&obd_dev_lock);
553 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
554 specified, then only the client with that uuid is returned,
555 otherwise any client connected to the tgt is returned. */
556 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
557 const char * typ_name,
558 struct obd_uuid *grp_uuid)
562 read_lock(&obd_dev_lock);
563 for (i = 0; i < class_devno_max(); i++) {
564 struct obd_device *obd = class_num2obd(i);
568 if ((strncmp(obd->obd_type->typ_name, typ_name,
569 strlen(typ_name)) == 0)) {
570 if (obd_uuid_equals(tgt_uuid,
571 &obd->u.cli.cl_target_uuid) &&
572 ((grp_uuid)? obd_uuid_equals(grp_uuid,
573 &obd->obd_uuid) : 1)) {
574 read_unlock(&obd_dev_lock);
579 read_unlock(&obd_dev_lock);
583 EXPORT_SYMBOL(class_find_client_obd);
585 /* Iterate the obd_device list looking devices have grp_uuid. Start
586 searching at *next, and if a device is found, the next index to look
587 at is saved in *next. If next is NULL, then the first matching device
588 will always be returned. */
589 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
595 else if (*next >= 0 && *next < class_devno_max())
600 read_lock(&obd_dev_lock);
601 for (; i < class_devno_max(); i++) {
602 struct obd_device *obd = class_num2obd(i);
606 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
609 read_unlock(&obd_dev_lock);
613 read_unlock(&obd_dev_lock);
617 EXPORT_SYMBOL(class_devices_in_group);
620 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
621 * adjust sptlrpc settings accordingly.
623 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
625 struct obd_device *obd;
629 LASSERT(namelen > 0);
631 read_lock(&obd_dev_lock);
632 for (i = 0; i < class_devno_max(); i++) {
633 obd = class_num2obd(i);
635 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
638 /* only notify mdc, osc, mdt, ost */
639 type = obd->obd_type->typ_name;
640 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
641 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
642 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
643 strcmp(type, LUSTRE_OST_NAME) != 0)
646 if (strncmp(obd->obd_name, fsname, namelen))
649 class_incref(obd, __FUNCTION__, obd);
650 read_unlock(&obd_dev_lock);
651 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
652 sizeof(KEY_SPTLRPC_CONF),
653 KEY_SPTLRPC_CONF, 0, NULL, NULL);
655 class_decref(obd, __FUNCTION__, obd);
656 read_lock(&obd_dev_lock);
658 read_unlock(&obd_dev_lock);
661 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
663 void obd_cleanup_caches(void)
666 if (obd_device_cachep) {
667 kmem_cache_destroy(obd_device_cachep);
668 obd_device_cachep = NULL;
671 kmem_cache_destroy(obdo_cachep);
675 kmem_cache_destroy(import_cachep);
676 import_cachep = NULL;
679 kmem_cache_destroy(capa_cachep);
685 int obd_init_caches(void)
690 LASSERT(obd_device_cachep == NULL);
691 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
692 sizeof(struct obd_device),
694 if (!obd_device_cachep)
695 GOTO(out, rc = -ENOMEM);
697 LASSERT(obdo_cachep == NULL);
698 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
701 GOTO(out, rc = -ENOMEM);
703 LASSERT(import_cachep == NULL);
704 import_cachep = kmem_cache_create("ll_import_cache",
705 sizeof(struct obd_import),
708 GOTO(out, rc = -ENOMEM);
710 LASSERT(capa_cachep == NULL);
711 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
714 GOTO(out, rc = -ENOMEM);
718 obd_cleanup_caches();
722 /* map connection to client */
723 struct obd_export *class_conn2export(struct lustre_handle *conn)
725 struct obd_export *export;
729 CDEBUG(D_CACHE, "looking for null handle\n");
733 if (conn->cookie == -1) { /* this means assign a new connection */
734 CDEBUG(D_CACHE, "want a new connection\n");
738 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
739 export = class_handle2object(conn->cookie, NULL);
742 EXPORT_SYMBOL(class_conn2export);
744 struct obd_device *class_exp2obd(struct obd_export *exp)
750 EXPORT_SYMBOL(class_exp2obd);
752 struct obd_device *class_conn2obd(struct lustre_handle *conn)
754 struct obd_export *export;
755 export = class_conn2export(conn);
757 struct obd_device *obd = export->exp_obd;
758 class_export_put(export);
763 EXPORT_SYMBOL(class_conn2obd);
765 struct obd_import *class_exp2cliimp(struct obd_export *exp)
767 struct obd_device *obd = exp->exp_obd;
770 return obd->u.cli.cl_import;
772 EXPORT_SYMBOL(class_exp2cliimp);
774 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
776 struct obd_device *obd = class_conn2obd(conn);
779 return obd->u.cli.cl_import;
781 EXPORT_SYMBOL(class_conn2cliimp);
783 /* Export management functions */
784 static void class_export_destroy(struct obd_export *exp)
786 struct obd_device *obd = exp->exp_obd;
789 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
790 LASSERT(obd != NULL);
792 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
793 exp->exp_client_uuid.uuid, obd->obd_name);
795 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
796 if (exp->exp_connection)
797 ptlrpc_put_connection_superhack(exp->exp_connection);
799 LASSERT(list_empty(&exp->exp_outstanding_replies));
800 LASSERT(list_empty(&exp->exp_uncommitted_replies));
801 LASSERT(list_empty(&exp->exp_req_replay_queue));
802 LASSERT(list_empty(&exp->exp_hp_rpcs));
803 obd_destroy_export(exp);
804 class_decref(obd, "export", exp);
806 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
810 static void export_handle_addref(void *export)
812 class_export_get(export);
815 static struct portals_handle_ops export_handle_ops = {
816 .hop_addref = export_handle_addref,
820 struct obd_export *class_export_get(struct obd_export *exp)
822 atomic_inc(&exp->exp_refcount);
823 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
824 atomic_read(&exp->exp_refcount));
827 EXPORT_SYMBOL(class_export_get);
829 void class_export_put(struct obd_export *exp)
831 LASSERT(exp != NULL);
832 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
833 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
834 atomic_read(&exp->exp_refcount) - 1);
836 if (atomic_dec_and_test(&exp->exp_refcount)) {
837 LASSERT(!list_empty(&exp->exp_obd_chain));
838 CDEBUG(D_IOCTL, "final put %p/%s\n",
839 exp, exp->exp_client_uuid.uuid);
841 /* release nid stat refererence */
842 lprocfs_exp_cleanup(exp);
844 obd_zombie_export_add(exp);
847 EXPORT_SYMBOL(class_export_put);
849 /* Creates a new export, adds it to the hash table, and returns a
850 * pointer to it. The refcount is 2: one for the hash reference, and
851 * one for the pointer returned by this function. */
852 struct obd_export *class_new_export(struct obd_device *obd,
853 struct obd_uuid *cluuid)
855 struct obd_export *export;
856 cfs_hash_t *hash = NULL;
860 OBD_ALLOC_PTR(export);
862 return ERR_PTR(-ENOMEM);
864 export->exp_conn_cnt = 0;
865 export->exp_lock_hash = NULL;
866 export->exp_flock_hash = NULL;
867 atomic_set(&export->exp_refcount, 2);
868 atomic_set(&export->exp_rpc_count, 0);
869 atomic_set(&export->exp_cb_count, 0);
870 atomic_set(&export->exp_locks_count, 0);
871 #if LUSTRE_TRACKS_LOCK_EXP_REFS
872 INIT_LIST_HEAD(&export->exp_locks_list);
873 spin_lock_init(&export->exp_locks_list_guard);
875 atomic_set(&export->exp_replay_count, 0);
876 export->exp_obd = obd;
877 INIT_LIST_HEAD(&export->exp_outstanding_replies);
878 spin_lock_init(&export->exp_uncommitted_replies_lock);
879 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
880 INIT_LIST_HEAD(&export->exp_req_replay_queue);
881 INIT_LIST_HEAD(&export->exp_handle.h_link);
882 INIT_LIST_HEAD(&export->exp_hp_rpcs);
883 INIT_LIST_HEAD(&export->exp_reg_rpcs);
884 class_handle_hash(&export->exp_handle, &export_handle_ops);
885 export->exp_last_request_time = cfs_time_current_sec();
886 spin_lock_init(&export->exp_lock);
887 spin_lock_init(&export->exp_rpc_lock);
888 INIT_HLIST_NODE(&export->exp_uuid_hash);
889 INIT_HLIST_NODE(&export->exp_nid_hash);
890 spin_lock_init(&export->exp_bl_list_lock);
891 INIT_LIST_HEAD(&export->exp_bl_list);
893 export->exp_sp_peer = LUSTRE_SP_ANY;
894 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
895 export->exp_client_uuid = *cluuid;
896 obd_init_export(export);
898 spin_lock(&obd->obd_dev_lock);
899 /* shouldn't happen, but might race */
900 if (obd->obd_stopping)
901 GOTO(exit_unlock, rc = -ENODEV);
903 hash = cfs_hash_getref(obd->obd_uuid_hash);
905 GOTO(exit_unlock, rc = -ENODEV);
906 spin_unlock(&obd->obd_dev_lock);
908 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
909 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
911 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
912 obd->obd_name, cluuid->uuid, rc);
913 GOTO(exit_err, rc = -EALREADY);
917 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
918 spin_lock(&obd->obd_dev_lock);
919 if (obd->obd_stopping) {
920 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
921 GOTO(exit_unlock, rc = -ENODEV);
924 class_incref(obd, "export", export);
925 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
926 list_add_tail(&export->exp_obd_chain_timed,
927 &export->exp_obd->obd_exports_timed);
928 export->exp_obd->obd_num_exports++;
929 spin_unlock(&obd->obd_dev_lock);
930 cfs_hash_putref(hash);
934 spin_unlock(&obd->obd_dev_lock);
937 cfs_hash_putref(hash);
938 class_handle_unhash(&export->exp_handle);
939 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
940 obd_destroy_export(export);
941 OBD_FREE_PTR(export);
944 EXPORT_SYMBOL(class_new_export);
946 void class_unlink_export(struct obd_export *exp)
948 class_handle_unhash(&exp->exp_handle);
950 spin_lock(&exp->exp_obd->obd_dev_lock);
951 /* delete an uuid-export hashitem from hashtables */
952 if (!hlist_unhashed(&exp->exp_uuid_hash))
953 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
954 &exp->exp_client_uuid,
955 &exp->exp_uuid_hash);
957 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
958 list_del_init(&exp->exp_obd_chain_timed);
959 exp->exp_obd->obd_num_exports--;
960 spin_unlock(&exp->exp_obd->obd_dev_lock);
961 class_export_put(exp);
963 EXPORT_SYMBOL(class_unlink_export);
965 /* Import management functions */
966 static void class_import_destroy(struct obd_import *imp)
970 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
971 imp->imp_obd->obd_name);
973 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
975 ptlrpc_put_connection_superhack(imp->imp_connection);
977 while (!list_empty(&imp->imp_conn_list)) {
978 struct obd_import_conn *imp_conn;
980 imp_conn = list_entry(imp->imp_conn_list.next,
981 struct obd_import_conn, oic_item);
982 list_del_init(&imp_conn->oic_item);
983 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
984 OBD_FREE(imp_conn, sizeof(*imp_conn));
987 LASSERT(imp->imp_sec == NULL);
988 class_decref(imp->imp_obd, "import", imp);
989 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
993 static void import_handle_addref(void *import)
995 class_import_get(import);
998 static struct portals_handle_ops import_handle_ops = {
999 .hop_addref = import_handle_addref,
1003 struct obd_import *class_import_get(struct obd_import *import)
1005 atomic_inc(&import->imp_refcount);
1006 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1007 atomic_read(&import->imp_refcount),
1008 import->imp_obd->obd_name);
1011 EXPORT_SYMBOL(class_import_get);
1013 void class_import_put(struct obd_import *imp)
1017 LASSERT(list_empty(&imp->imp_zombie_chain));
1018 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1020 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1021 atomic_read(&imp->imp_refcount) - 1,
1022 imp->imp_obd->obd_name);
1024 if (atomic_dec_and_test(&imp->imp_refcount)) {
1025 CDEBUG(D_INFO, "final put import %p\n", imp);
1026 obd_zombie_import_add(imp);
1029 /* catch possible import put race */
1030 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1033 EXPORT_SYMBOL(class_import_put);
1035 static void init_imp_at(struct imp_at *at) {
1037 at_init(&at->iat_net_latency, 0, 0);
1038 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1039 /* max service estimates are tracked on the server side, so
1040 don't use the AT history here, just use the last reported
1041 val. (But keep hist for proc histogram, worst_ever) */
1042 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1047 struct obd_import *class_new_import(struct obd_device *obd)
1049 struct obd_import *imp;
1051 OBD_ALLOC(imp, sizeof(*imp));
1055 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1056 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1057 INIT_LIST_HEAD(&imp->imp_replay_list);
1058 INIT_LIST_HEAD(&imp->imp_sending_list);
1059 INIT_LIST_HEAD(&imp->imp_delayed_list);
1060 INIT_LIST_HEAD(&imp->imp_committed_list);
1061 imp->imp_replay_cursor = &imp->imp_committed_list;
1062 spin_lock_init(&imp->imp_lock);
1063 imp->imp_last_success_conn = 0;
1064 imp->imp_state = LUSTRE_IMP_NEW;
1065 imp->imp_obd = class_incref(obd, "import", imp);
1066 mutex_init(&imp->imp_sec_mutex);
1067 init_waitqueue_head(&imp->imp_recovery_waitq);
1069 atomic_set(&imp->imp_refcount, 2);
1070 atomic_set(&imp->imp_unregistering, 0);
1071 atomic_set(&imp->imp_inflight, 0);
1072 atomic_set(&imp->imp_replay_inflight, 0);
1073 atomic_set(&imp->imp_inval_count, 0);
1074 INIT_LIST_HEAD(&imp->imp_conn_list);
1075 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1076 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1077 init_imp_at(&imp->imp_at);
1079 /* the default magic is V2, will be used in connect RPC, and
1080 * then adjusted according to the flags in request/reply. */
1081 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1085 EXPORT_SYMBOL(class_new_import);
1087 void class_destroy_import(struct obd_import *import)
1089 LASSERT(import != NULL);
1090 LASSERT(import != LP_POISON);
1092 class_handle_unhash(&import->imp_handle);
1094 spin_lock(&import->imp_lock);
1095 import->imp_generation++;
1096 spin_unlock(&import->imp_lock);
1097 class_import_put(import);
1099 EXPORT_SYMBOL(class_destroy_import);
1101 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1103 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1105 spin_lock(&exp->exp_locks_list_guard);
1107 LASSERT(lock->l_exp_refs_nr >= 0);
1109 if (lock->l_exp_refs_target != NULL &&
1110 lock->l_exp_refs_target != exp) {
1111 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1112 exp, lock, lock->l_exp_refs_target);
1114 if ((lock->l_exp_refs_nr ++) == 0) {
1115 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1116 lock->l_exp_refs_target = exp;
1118 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1119 lock, exp, lock->l_exp_refs_nr);
1120 spin_unlock(&exp->exp_locks_list_guard);
1122 EXPORT_SYMBOL(__class_export_add_lock_ref);
1124 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1126 spin_lock(&exp->exp_locks_list_guard);
1127 LASSERT(lock->l_exp_refs_nr > 0);
1128 if (lock->l_exp_refs_target != exp) {
1129 LCONSOLE_WARN("lock %p, "
1130 "mismatching export pointers: %p, %p\n",
1131 lock, lock->l_exp_refs_target, exp);
1133 if (-- lock->l_exp_refs_nr == 0) {
1134 list_del_init(&lock->l_exp_refs_link);
1135 lock->l_exp_refs_target = NULL;
1137 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1138 lock, exp, lock->l_exp_refs_nr);
1139 spin_unlock(&exp->exp_locks_list_guard);
1141 EXPORT_SYMBOL(__class_export_del_lock_ref);
1144 /* A connection defines an export context in which preallocation can
1145 be managed. This releases the export pointer reference, and returns
1146 the export handle, so the export refcount is 1 when this function
1148 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1149 struct obd_uuid *cluuid)
1151 struct obd_export *export;
1152 LASSERT(conn != NULL);
1153 LASSERT(obd != NULL);
1154 LASSERT(cluuid != NULL);
1157 export = class_new_export(obd, cluuid);
1159 RETURN(PTR_ERR(export));
1161 conn->cookie = export->exp_handle.h_cookie;
1162 class_export_put(export);
1164 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1165 cluuid->uuid, conn->cookie);
1168 EXPORT_SYMBOL(class_connect);
1170 /* if export is involved in recovery then clean up related things */
1171 static void class_export_recovery_cleanup(struct obd_export *exp)
1173 struct obd_device *obd = exp->exp_obd;
1175 spin_lock(&obd->obd_recovery_task_lock);
1176 if (obd->obd_recovering) {
1177 if (exp->exp_in_recovery) {
1178 spin_lock(&exp->exp_lock);
1179 exp->exp_in_recovery = 0;
1180 spin_unlock(&exp->exp_lock);
1181 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1182 atomic_dec(&obd->obd_connected_clients);
1185 /* if called during recovery then should update
1186 * obd_stale_clients counter,
1187 * lightweight exports are not counted */
1188 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1189 exp->exp_obd->obd_stale_clients++;
1191 spin_unlock(&obd->obd_recovery_task_lock);
1193 spin_lock(&exp->exp_lock);
1194 /** Cleanup req replay fields */
1195 if (exp->exp_req_replay_needed) {
1196 exp->exp_req_replay_needed = 0;
1198 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1199 atomic_dec(&obd->obd_req_replay_clients);
1202 /** Cleanup lock replay data */
1203 if (exp->exp_lock_replay_needed) {
1204 exp->exp_lock_replay_needed = 0;
1206 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1207 atomic_dec(&obd->obd_lock_replay_clients);
1209 spin_unlock(&exp->exp_lock);
1212 /* This function removes 1-3 references from the export:
1213 * 1 - for export pointer passed
1214 * and if disconnect really need
1215 * 2 - removing from hash
1216 * 3 - in client_unlink_export
1217 * The export pointer passed to this function can destroyed */
1218 int class_disconnect(struct obd_export *export)
1220 int already_disconnected;
1223 if (export == NULL) {
1224 CWARN("attempting to free NULL export %p\n", export);
1228 spin_lock(&export->exp_lock);
1229 already_disconnected = export->exp_disconnected;
1230 export->exp_disconnected = 1;
1231 spin_unlock(&export->exp_lock);
1233 /* class_cleanup(), abort_recovery(), and class_fail_export()
1234 * all end up in here, and if any of them race we shouldn't
1235 * call extra class_export_puts(). */
1236 if (already_disconnected) {
1237 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1238 GOTO(no_disconn, already_disconnected);
1241 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1242 export->exp_handle.h_cookie);
1244 if (!hlist_unhashed(&export->exp_nid_hash))
1245 cfs_hash_del(export->exp_obd->obd_nid_hash,
1246 &export->exp_connection->c_peer.nid,
1247 &export->exp_nid_hash);
1249 class_export_recovery_cleanup(export);
1250 class_unlink_export(export);
1252 class_export_put(export);
1255 EXPORT_SYMBOL(class_disconnect);
1257 /* Return non-zero for a fully connected export */
1258 int class_connected_export(struct obd_export *exp)
1263 spin_lock(&exp->exp_lock);
1264 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1265 spin_unlock(&exp->exp_lock);
1269 EXPORT_SYMBOL(class_connected_export);
1271 static void class_disconnect_export_list(struct list_head *list,
1272 enum obd_option flags)
1275 struct obd_export *exp;
1278 /* It's possible that an export may disconnect itself, but
1279 * nothing else will be added to this list. */
1280 while (!list_empty(list)) {
1281 exp = list_entry(list->next, struct obd_export,
1283 /* need for safe call CDEBUG after obd_disconnect */
1284 class_export_get(exp);
1286 spin_lock(&exp->exp_lock);
1287 exp->exp_flags = flags;
1288 spin_unlock(&exp->exp_lock);
1290 if (obd_uuid_equals(&exp->exp_client_uuid,
1291 &exp->exp_obd->obd_uuid)) {
1293 "exp %p export uuid == obd uuid, don't discon\n",
1295 /* Need to delete this now so we don't end up pointing
1296 * to work_list later when this export is cleaned up. */
1297 list_del_init(&exp->exp_obd_chain);
1298 class_export_put(exp);
1302 class_export_get(exp);
1303 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1304 "last request at "CFS_TIME_T"\n",
1305 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1306 exp, exp->exp_last_request_time);
1307 /* release one export reference anyway */
1308 rc = obd_disconnect(exp);
1310 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1311 obd_export_nid2str(exp), exp, rc);
1312 class_export_put(exp);
1317 void class_disconnect_exports(struct obd_device *obd)
1319 struct list_head work_list;
1322 /* Move all of the exports from obd_exports to a work list, en masse. */
1323 INIT_LIST_HEAD(&work_list);
1324 spin_lock(&obd->obd_dev_lock);
1325 list_splice_init(&obd->obd_exports, &work_list);
1326 list_splice_init(&obd->obd_delayed_exports, &work_list);
1327 spin_unlock(&obd->obd_dev_lock);
1329 if (!list_empty(&work_list)) {
1330 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1331 "disconnecting them\n", obd->obd_minor, obd);
1332 class_disconnect_export_list(&work_list,
1333 exp_flags_from_obd(obd));
1335 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1336 obd->obd_minor, obd);
1339 EXPORT_SYMBOL(class_disconnect_exports);
1341 /* Remove exports that have not completed recovery.
1343 void class_disconnect_stale_exports(struct obd_device *obd,
1344 int (*test_export)(struct obd_export *))
1346 struct list_head work_list;
1347 struct obd_export *exp, *n;
1351 INIT_LIST_HEAD(&work_list);
1352 spin_lock(&obd->obd_dev_lock);
1353 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1355 /* don't count self-export as client */
1356 if (obd_uuid_equals(&exp->exp_client_uuid,
1357 &exp->exp_obd->obd_uuid))
1360 /* don't evict clients which have no slot in last_rcvd
1361 * (e.g. lightweight connection) */
1362 if (exp->exp_target_data.ted_lr_idx == -1)
1365 spin_lock(&exp->exp_lock);
1366 if (exp->exp_failed || test_export(exp)) {
1367 spin_unlock(&exp->exp_lock);
1370 exp->exp_failed = 1;
1371 spin_unlock(&exp->exp_lock);
1373 list_move(&exp->exp_obd_chain, &work_list);
1375 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1376 obd->obd_name, exp->exp_client_uuid.uuid,
1377 exp->exp_connection == NULL ? "<unknown>" :
1378 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1379 print_export_data(exp, "EVICTING", 0);
1381 spin_unlock(&obd->obd_dev_lock);
1384 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1385 obd->obd_name, evicted);
1387 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1388 OBD_OPT_ABORT_RECOV);
1391 EXPORT_SYMBOL(class_disconnect_stale_exports);
1393 void class_fail_export(struct obd_export *exp)
1395 int rc, already_failed;
1397 spin_lock(&exp->exp_lock);
1398 already_failed = exp->exp_failed;
1399 exp->exp_failed = 1;
1400 spin_unlock(&exp->exp_lock);
1402 if (already_failed) {
1403 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1404 exp, exp->exp_client_uuid.uuid);
1408 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1409 exp, exp->exp_client_uuid.uuid);
1411 if (obd_dump_on_timeout)
1412 libcfs_debug_dumplog();
1414 /* need for safe call CDEBUG after obd_disconnect */
1415 class_export_get(exp);
1417 /* Most callers into obd_disconnect are removing their own reference
1418 * (request, for example) in addition to the one from the hash table.
1419 * We don't have such a reference here, so make one. */
1420 class_export_get(exp);
1421 rc = obd_disconnect(exp);
1423 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1425 CDEBUG(D_HA, "disconnected export %p/%s\n",
1426 exp, exp->exp_client_uuid.uuid);
1427 class_export_put(exp);
1429 EXPORT_SYMBOL(class_fail_export);
1431 char *obd_export_nid2str(struct obd_export *exp)
1433 if (exp->exp_connection != NULL)
1434 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1438 EXPORT_SYMBOL(obd_export_nid2str);
1440 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1442 cfs_hash_t *nid_hash;
1443 struct obd_export *doomed_exp = NULL;
1444 int exports_evicted = 0;
1446 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1448 spin_lock(&obd->obd_dev_lock);
1449 /* umount has run already, so evict thread should leave
1450 * its task to umount thread now */
1451 if (obd->obd_stopping) {
1452 spin_unlock(&obd->obd_dev_lock);
1453 return exports_evicted;
1455 nid_hash = obd->obd_nid_hash;
1456 cfs_hash_getref(nid_hash);
1457 spin_unlock(&obd->obd_dev_lock);
1460 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1461 if (doomed_exp == NULL)
1464 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1465 "nid %s found, wanted nid %s, requested nid %s\n",
1466 obd_export_nid2str(doomed_exp),
1467 libcfs_nid2str(nid_key), nid);
1468 LASSERTF(doomed_exp != obd->obd_self_export,
1469 "self-export is hashed by NID?\n");
1471 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1472 "request\n", obd->obd_name,
1473 obd_uuid2str(&doomed_exp->exp_client_uuid),
1474 obd_export_nid2str(doomed_exp));
1475 class_fail_export(doomed_exp);
1476 class_export_put(doomed_exp);
1479 cfs_hash_putref(nid_hash);
1481 if (!exports_evicted)
1482 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1483 obd->obd_name, nid);
1484 return exports_evicted;
1486 EXPORT_SYMBOL(obd_export_evict_by_nid);
1488 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1490 cfs_hash_t *uuid_hash;
1491 struct obd_export *doomed_exp = NULL;
1492 struct obd_uuid doomed_uuid;
1493 int exports_evicted = 0;
1495 spin_lock(&obd->obd_dev_lock);
1496 if (obd->obd_stopping) {
1497 spin_unlock(&obd->obd_dev_lock);
1498 return exports_evicted;
1500 uuid_hash = obd->obd_uuid_hash;
1501 cfs_hash_getref(uuid_hash);
1502 spin_unlock(&obd->obd_dev_lock);
1504 obd_str2uuid(&doomed_uuid, uuid);
1505 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1506 CERROR("%s: can't evict myself\n", obd->obd_name);
1507 cfs_hash_putref(uuid_hash);
1508 return exports_evicted;
1511 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1513 if (doomed_exp == NULL) {
1514 CERROR("%s: can't disconnect %s: no exports found\n",
1515 obd->obd_name, uuid);
1517 CWARN("%s: evicting %s at adminstrative request\n",
1518 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1519 class_fail_export(doomed_exp);
1520 class_export_put(doomed_exp);
1523 cfs_hash_putref(uuid_hash);
1525 return exports_evicted;
1527 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1529 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1530 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1531 EXPORT_SYMBOL(class_export_dump_hook);
1534 static void print_export_data(struct obd_export *exp, const char *status,
1537 struct ptlrpc_reply_state *rs;
1538 struct ptlrpc_reply_state *first_reply = NULL;
1541 spin_lock(&exp->exp_lock);
1542 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1548 spin_unlock(&exp->exp_lock);
1550 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1551 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1552 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1553 atomic_read(&exp->exp_rpc_count),
1554 atomic_read(&exp->exp_cb_count),
1555 atomic_read(&exp->exp_locks_count),
1556 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1557 nreplies, first_reply, nreplies > 3 ? "..." : "",
1558 exp->exp_last_committed);
1559 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1560 if (locks && class_export_dump_hook != NULL)
1561 class_export_dump_hook(exp);
1565 void dump_exports(struct obd_device *obd, int locks)
1567 struct obd_export *exp;
1569 spin_lock(&obd->obd_dev_lock);
1570 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1571 print_export_data(exp, "ACTIVE", locks);
1572 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1573 print_export_data(exp, "UNLINKED", locks);
1574 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1575 print_export_data(exp, "DELAYED", locks);
1576 spin_unlock(&obd->obd_dev_lock);
1577 spin_lock(&obd_zombie_impexp_lock);
1578 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1579 print_export_data(exp, "ZOMBIE", locks);
1580 spin_unlock(&obd_zombie_impexp_lock);
1582 EXPORT_SYMBOL(dump_exports);
1584 void obd_exports_barrier(struct obd_device *obd)
1587 LASSERT(list_empty(&obd->obd_exports));
1588 spin_lock(&obd->obd_dev_lock);
1589 while (!list_empty(&obd->obd_unlinked_exports)) {
1590 spin_unlock(&obd->obd_dev_lock);
1591 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1592 cfs_time_seconds(waited));
1593 if (waited > 5 && IS_PO2(waited)) {
1594 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1595 "more than %d seconds. "
1596 "The obd refcount = %d. Is it stuck?\n",
1597 obd->obd_name, waited,
1598 atomic_read(&obd->obd_refcount));
1599 dump_exports(obd, 1);
1602 spin_lock(&obd->obd_dev_lock);
1604 spin_unlock(&obd->obd_dev_lock);
1606 EXPORT_SYMBOL(obd_exports_barrier);
1608 /* Total amount of zombies to be destroyed */
1609 static int zombies_count = 0;
1612 * kill zombie imports and exports
1614 void obd_zombie_impexp_cull(void)
1616 struct obd_import *import;
1617 struct obd_export *export;
1621 spin_lock(&obd_zombie_impexp_lock);
1624 if (!list_empty(&obd_zombie_imports)) {
1625 import = list_entry(obd_zombie_imports.next,
1628 list_del_init(&import->imp_zombie_chain);
1632 if (!list_empty(&obd_zombie_exports)) {
1633 export = list_entry(obd_zombie_exports.next,
1636 list_del_init(&export->exp_obd_chain);
1639 spin_unlock(&obd_zombie_impexp_lock);
1641 if (import != NULL) {
1642 class_import_destroy(import);
1643 spin_lock(&obd_zombie_impexp_lock);
1645 spin_unlock(&obd_zombie_impexp_lock);
1648 if (export != NULL) {
1649 class_export_destroy(export);
1650 spin_lock(&obd_zombie_impexp_lock);
1652 spin_unlock(&obd_zombie_impexp_lock);
1656 } while (import != NULL || export != NULL);
1660 static struct completion obd_zombie_start;
1661 static struct completion obd_zombie_stop;
1662 static unsigned long obd_zombie_flags;
1663 static wait_queue_head_t obd_zombie_waitq;
1664 static pid_t obd_zombie_pid;
1667 OBD_ZOMBIE_STOP = 0x0001,
1671 * check for work for kill zombie import/export thread.
1673 static int obd_zombie_impexp_check(void *arg)
1677 spin_lock(&obd_zombie_impexp_lock);
1678 rc = (zombies_count == 0) &&
1679 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1680 spin_unlock(&obd_zombie_impexp_lock);
1686 * Add export to the obd_zombe thread and notify it.
1688 static void obd_zombie_export_add(struct obd_export *exp) {
1689 spin_lock(&exp->exp_obd->obd_dev_lock);
1690 LASSERT(!list_empty(&exp->exp_obd_chain));
1691 list_del_init(&exp->exp_obd_chain);
1692 spin_unlock(&exp->exp_obd->obd_dev_lock);
1693 spin_lock(&obd_zombie_impexp_lock);
1695 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1696 spin_unlock(&obd_zombie_impexp_lock);
1698 obd_zombie_impexp_notify();
1702 * Add import to the obd_zombe thread and notify it.
1704 static void obd_zombie_import_add(struct obd_import *imp) {
1705 LASSERT(imp->imp_sec == NULL);
1706 LASSERT(imp->imp_rq_pool == NULL);
1707 spin_lock(&obd_zombie_impexp_lock);
1708 LASSERT(list_empty(&imp->imp_zombie_chain));
1710 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1711 spin_unlock(&obd_zombie_impexp_lock);
1713 obd_zombie_impexp_notify();
1717 * notify import/export destroy thread about new zombie.
1719 static void obd_zombie_impexp_notify(void)
1722 * Make sure obd_zomebie_impexp_thread get this notification.
1723 * It is possible this signal only get by obd_zombie_barrier, and
1724 * barrier gulps this notification and sleeps away and hangs ensues
1726 wake_up_all(&obd_zombie_waitq);
1730 * check whether obd_zombie is idle
1732 static int obd_zombie_is_idle(void)
1736 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1737 spin_lock(&obd_zombie_impexp_lock);
1738 rc = (zombies_count == 0);
1739 spin_unlock(&obd_zombie_impexp_lock);
1744 * wait when obd_zombie import/export queues become empty
1746 void obd_zombie_barrier(void)
1748 struct l_wait_info lwi = { 0 };
1750 if (obd_zombie_pid == current_pid())
1751 /* don't wait for myself */
1753 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1755 EXPORT_SYMBOL(obd_zombie_barrier);
1759 * destroy zombie export/import thread.
1761 static int obd_zombie_impexp_thread(void *unused)
1763 unshare_fs_struct();
1764 complete(&obd_zombie_start);
1766 obd_zombie_pid = current_pid();
1768 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1769 struct l_wait_info lwi = { 0 };
1771 l_wait_event(obd_zombie_waitq,
1772 !obd_zombie_impexp_check(NULL), &lwi);
1773 obd_zombie_impexp_cull();
1776 * Notify obd_zombie_barrier callers that queues
1779 wake_up(&obd_zombie_waitq);
1782 complete(&obd_zombie_stop);
1789 * start destroy zombie import/export thread
1791 int obd_zombie_impexp_init(void)
1793 struct task_struct *task;
1795 INIT_LIST_HEAD(&obd_zombie_imports);
1797 INIT_LIST_HEAD(&obd_zombie_exports);
1798 spin_lock_init(&obd_zombie_impexp_lock);
1799 init_completion(&obd_zombie_start);
1800 init_completion(&obd_zombie_stop);
1801 init_waitqueue_head(&obd_zombie_waitq);
1804 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1806 RETURN(PTR_ERR(task));
1808 wait_for_completion(&obd_zombie_start);
1812 * stop destroy zombie import/export thread
1814 void obd_zombie_impexp_stop(void)
1816 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1817 obd_zombie_impexp_notify();
1818 wait_for_completion(&obd_zombie_stop);
1821 /***** Kernel-userspace comm helpers *******/
1823 /* Get length of entire message, including header */
1824 int kuc_len(int payload_len)
1826 return sizeof(struct kuc_hdr) + payload_len;
1828 EXPORT_SYMBOL(kuc_len);
1830 /* Get a pointer to kuc header, given a ptr to the payload
1831 * @param p Pointer to payload area
1832 * @returns Pointer to kuc header
1834 struct kuc_hdr * kuc_ptr(void *p)
1836 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1837 LASSERT(lh->kuc_magic == KUC_MAGIC);
1840 EXPORT_SYMBOL(kuc_ptr);
1842 /* Test if payload is part of kuc message
1843 * @param p Pointer to payload area
1846 int kuc_ispayload(void *p)
1848 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1850 if (kh->kuc_magic == KUC_MAGIC)
1855 EXPORT_SYMBOL(kuc_ispayload);
1857 /* Alloc space for a message, and fill in header
1858 * @return Pointer to payload area
1860 void *kuc_alloc(int payload_len, int transport, int type)
1863 int len = kuc_len(payload_len);
1867 return ERR_PTR(-ENOMEM);
1869 lh->kuc_magic = KUC_MAGIC;
1870 lh->kuc_transport = transport;
1871 lh->kuc_msgtype = type;
1872 lh->kuc_msglen = len;
1874 return (void *)(lh + 1);
1876 EXPORT_SYMBOL(kuc_alloc);
1878 /* Takes pointer to payload area */
1879 inline void kuc_free(void *p, int payload_len)
1881 struct kuc_hdr *lh = kuc_ptr(p);
1882 OBD_FREE(lh, kuc_len(payload_len));
1884 EXPORT_SYMBOL(kuc_free);
1886 struct obd_request_slot_waiter {
1887 struct list_head orsw_entry;
1888 wait_queue_head_t orsw_waitq;
1892 static bool obd_request_slot_avail(struct client_obd *cli,
1893 struct obd_request_slot_waiter *orsw)
1897 spin_lock(&cli->cl_loi_list_lock);
1898 avail = !!list_empty(&orsw->orsw_entry);
1899 spin_unlock(&cli->cl_loi_list_lock);
1905 * For network flow control, the RPC sponsor needs to acquire a credit
1906 * before sending the RPC. The credits count for a connection is defined
1907 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1908 * the subsequent RPC sponsors need to wait until others released their
1909 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1911 int obd_get_request_slot(struct client_obd *cli)
1913 struct obd_request_slot_waiter orsw;
1914 struct l_wait_info lwi;
1917 spin_lock(&cli->cl_loi_list_lock);
1918 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1919 cli->cl_r_in_flight++;
1920 spin_unlock(&cli->cl_loi_list_lock);
1924 init_waitqueue_head(&orsw.orsw_waitq);
1925 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1926 orsw.orsw_signaled = false;
1927 spin_unlock(&cli->cl_loi_list_lock);
1929 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1930 rc = l_wait_event(orsw.orsw_waitq,
1931 obd_request_slot_avail(cli, &orsw) ||
1935 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1936 * freed but other (such as obd_put_request_slot) is using it. */
1937 spin_lock(&cli->cl_loi_list_lock);
1939 if (!orsw.orsw_signaled) {
1940 if (list_empty(&orsw.orsw_entry))
1941 cli->cl_r_in_flight--;
1943 list_del(&orsw.orsw_entry);
1947 if (orsw.orsw_signaled) {
1948 LASSERT(list_empty(&orsw.orsw_entry));
1952 spin_unlock(&cli->cl_loi_list_lock);
1956 EXPORT_SYMBOL(obd_get_request_slot);
1958 void obd_put_request_slot(struct client_obd *cli)
1960 struct obd_request_slot_waiter *orsw;
1962 spin_lock(&cli->cl_loi_list_lock);
1963 cli->cl_r_in_flight--;
1965 /* If there is free slot, wakeup the first waiter. */
1966 if (!list_empty(&cli->cl_loi_read_list) &&
1967 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1968 orsw = list_entry(cli->cl_loi_read_list.next,
1969 struct obd_request_slot_waiter, orsw_entry);
1970 list_del_init(&orsw->orsw_entry);
1971 cli->cl_r_in_flight++;
1972 wake_up(&orsw->orsw_waitq);
1974 spin_unlock(&cli->cl_loi_list_lock);
1976 EXPORT_SYMBOL(obd_put_request_slot);
1978 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1980 return cli->cl_max_rpcs_in_flight;
1982 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1984 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1986 struct obd_request_slot_waiter *orsw;
1991 if (max > OBD_MAX_RIF_MAX || max < 1)
1994 spin_lock(&cli->cl_loi_list_lock);
1995 old = cli->cl_max_rpcs_in_flight;
1996 cli->cl_max_rpcs_in_flight = max;
1999 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2000 for (i = 0; i < diff; i++) {
2001 if (list_empty(&cli->cl_loi_read_list))
2004 orsw = list_entry(cli->cl_loi_read_list.next,
2005 struct obd_request_slot_waiter, orsw_entry);
2006 list_del_init(&orsw->orsw_entry);
2007 cli->cl_r_in_flight++;
2008 wake_up(&orsw->orsw_waitq);
2010 spin_unlock(&cli->cl_loi_list_lock);
2014 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);