4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_class.h>
44 #include <lprocfs_status.h>
46 spinlock_t obd_types_lock;
48 struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 struct kmem_cache *import_cachep;
53 struct list_head obd_zombie_imports;
54 struct list_head obd_zombie_exports;
55 spinlock_t obd_zombie_impexp_lock;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60 const char *status, int locks);
62 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
63 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
66 * support functions: we could use inter-module communication, but this
67 * is more portable to other OS's
69 static struct obd_device *obd_device_alloc(void)
71 struct obd_device *obd;
73 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
75 obd->obd_magic = OBD_DEVICE_MAGIC;
80 static void obd_device_free(struct obd_device *obd)
83 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
84 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
85 if (obd->obd_namespace != NULL) {
86 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
87 obd, obd->obd_namespace, obd->obd_force);
90 lu_ref_fini(&obd->obd_reference);
91 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 struct obd_type *class_search_type(const char *name)
96 struct list_head *tmp;
97 struct obd_type *type;
99 spin_lock(&obd_types_lock);
100 list_for_each(tmp, &obd_types) {
101 type = list_entry(tmp, struct obd_type, typ_chain);
102 if (strcmp(type->typ_name, name) == 0) {
103 spin_unlock(&obd_types_lock);
107 spin_unlock(&obd_types_lock);
110 EXPORT_SYMBOL(class_search_type);
112 struct obd_type *class_get_type(const char *name)
114 struct obd_type *type = class_search_type(name);
116 #ifdef HAVE_MODULE_LOADING_SUPPORT
118 const char *modname = name;
120 if (strcmp(modname, "obdfilter") == 0)
123 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
124 modname = LUSTRE_OSP_NAME;
126 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
127 modname = LUSTRE_MDT_NAME;
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 spin_lock(&type->obd_type_lock);
141 try_module_get(type->typ_dt_ops->o_owner);
142 spin_unlock(&type->obd_type_lock);
146 EXPORT_SYMBOL(class_get_type);
148 void class_put_type(struct obd_type *type)
151 spin_lock(&type->obd_type_lock);
153 module_put(type->typ_dt_ops->o_owner);
154 spin_unlock(&type->obd_type_lock);
156 EXPORT_SYMBOL(class_put_type);
158 #define CLASS_MAX_NAME 1024
160 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
161 bool enable_proc, struct lprocfs_seq_vars *vars,
162 const char *name, struct lu_device_type *ldt)
164 struct obd_type *type;
169 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
171 if (class_search_type(name)) {
172 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
177 OBD_ALLOC(type, sizeof(*type));
181 OBD_ALLOC_PTR(type->typ_dt_ops);
182 OBD_ALLOC_PTR(type->typ_md_ops);
183 OBD_ALLOC(type->typ_name, strlen(name) + 1);
185 if (type->typ_dt_ops == NULL ||
186 type->typ_md_ops == NULL ||
187 type->typ_name == NULL)
190 *(type->typ_dt_ops) = *dt_ops;
191 /* md_ops is optional */
193 *(type->typ_md_ops) = *md_ops;
194 strcpy(type->typ_name, name);
195 spin_lock_init(&type->obd_type_lock);
199 type->typ_procroot = lprocfs_seq_register(type->typ_name,
202 if (IS_ERR(type->typ_procroot)) {
203 rc = PTR_ERR(type->typ_procroot);
204 type->typ_procroot = NULL;
211 rc = lu_device_type_init(ldt);
216 spin_lock(&obd_types_lock);
217 list_add(&type->typ_chain, &obd_types);
218 spin_unlock(&obd_types_lock);
223 if (type->typ_name != NULL) {
225 if (type->typ_procroot != NULL) {
226 #ifndef HAVE_ONLY_PROCFS_SEQ
227 lprocfs_try_remove_proc_entry(type->typ_name,
230 remove_proc_subtree(type->typ_name, proc_lustre_root);
234 OBD_FREE(type->typ_name, strlen(name) + 1);
236 if (type->typ_md_ops != NULL)
237 OBD_FREE_PTR(type->typ_md_ops);
238 if (type->typ_dt_ops != NULL)
239 OBD_FREE_PTR(type->typ_dt_ops);
240 OBD_FREE(type, sizeof(*type));
243 EXPORT_SYMBOL(class_register_type);
245 int class_unregister_type(const char *name)
247 struct obd_type *type = class_search_type(name);
251 CERROR("unknown obd type\n");
255 if (type->typ_refcnt) {
256 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
257 /* This is a bad situation, let's make the best of it */
258 /* Remove ops, but leave the name for debugging */
259 OBD_FREE_PTR(type->typ_dt_ops);
260 OBD_FREE_PTR(type->typ_md_ops);
264 /* we do not use type->typ_procroot as for compatibility purposes
265 * other modules can share names (i.e. lod can use lov entry). so
266 * we can't reference pointer as it can get invalided when another
267 * module removes the entry */
269 if (type->typ_procroot != NULL) {
270 #ifndef HAVE_ONLY_PROCFS_SEQ
271 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
273 remove_proc_subtree(type->typ_name, proc_lustre_root);
277 if (type->typ_procsym != NULL)
278 lprocfs_remove(&type->typ_procsym);
281 lu_device_type_fini(type->typ_lu);
283 spin_lock(&obd_types_lock);
284 list_del(&type->typ_chain);
285 spin_unlock(&obd_types_lock);
286 OBD_FREE(type->typ_name, strlen(name) + 1);
287 if (type->typ_dt_ops != NULL)
288 OBD_FREE_PTR(type->typ_dt_ops);
289 if (type->typ_md_ops != NULL)
290 OBD_FREE_PTR(type->typ_md_ops);
291 OBD_FREE(type, sizeof(*type));
293 } /* class_unregister_type */
294 EXPORT_SYMBOL(class_unregister_type);
297 * Create a new obd device.
299 * Find an empty slot in ::obd_devs[], create a new obd device in it.
301 * \param[in] type_name obd device type string.
302 * \param[in] name obd device name.
304 * \retval NULL if create fails, otherwise return the obd device
307 struct obd_device *class_newdev(const char *type_name, const char *name)
309 struct obd_device *result = NULL;
310 struct obd_device *newdev;
311 struct obd_type *type = NULL;
313 int new_obd_minor = 0;
316 if (strlen(name) >= MAX_OBD_NAME) {
317 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
318 RETURN(ERR_PTR(-EINVAL));
321 type = class_get_type(type_name);
323 CERROR("OBD: unknown type: %s\n", type_name);
324 RETURN(ERR_PTR(-ENODEV));
327 newdev = obd_device_alloc();
329 GOTO(out_type, result = ERR_PTR(-ENOMEM));
331 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
333 write_lock(&obd_dev_lock);
334 for (i = 0; i < class_devno_max(); i++) {
335 struct obd_device *obd = class_num2obd(i);
337 if (obd && (strcmp(name, obd->obd_name) == 0)) {
338 CERROR("Device %s already exists at %d, won't add\n",
341 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
342 "%p obd_magic %08x != %08x\n", result,
343 result->obd_magic, OBD_DEVICE_MAGIC);
344 LASSERTF(result->obd_minor == new_obd_minor,
345 "%p obd_minor %d != %d\n", result,
346 result->obd_minor, new_obd_minor);
348 obd_devs[result->obd_minor] = NULL;
349 result->obd_name[0]='\0';
351 result = ERR_PTR(-EEXIST);
354 if (!result && !obd) {
356 result->obd_minor = i;
358 result->obd_type = type;
359 strncpy(result->obd_name, name,
360 sizeof(result->obd_name) - 1);
361 obd_devs[i] = result;
364 write_unlock(&obd_dev_lock);
366 if (result == NULL && i >= class_devno_max()) {
367 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
369 GOTO(out, result = ERR_PTR(-EOVERFLOW));
375 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
376 result->obd_name, result);
380 obd_device_free(newdev);
382 class_put_type(type);
386 void class_release_dev(struct obd_device *obd)
388 struct obd_type *obd_type = obd->obd_type;
390 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
391 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
392 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
393 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
394 LASSERT(obd_type != NULL);
396 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
397 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
399 write_lock(&obd_dev_lock);
400 obd_devs[obd->obd_minor] = NULL;
401 write_unlock(&obd_dev_lock);
402 obd_device_free(obd);
404 class_put_type(obd_type);
407 int class_name2dev(const char *name)
414 read_lock(&obd_dev_lock);
415 for (i = 0; i < class_devno_max(); i++) {
416 struct obd_device *obd = class_num2obd(i);
418 if (obd && strcmp(name, obd->obd_name) == 0) {
419 /* Make sure we finished attaching before we give
420 out any references */
421 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
422 if (obd->obd_attached) {
423 read_unlock(&obd_dev_lock);
429 read_unlock(&obd_dev_lock);
433 EXPORT_SYMBOL(class_name2dev);
435 struct obd_device *class_name2obd(const char *name)
437 int dev = class_name2dev(name);
439 if (dev < 0 || dev > class_devno_max())
441 return class_num2obd(dev);
443 EXPORT_SYMBOL(class_name2obd);
445 int class_uuid2dev(struct obd_uuid *uuid)
449 read_lock(&obd_dev_lock);
450 for (i = 0; i < class_devno_max(); i++) {
451 struct obd_device *obd = class_num2obd(i);
453 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
454 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
455 read_unlock(&obd_dev_lock);
459 read_unlock(&obd_dev_lock);
463 EXPORT_SYMBOL(class_uuid2dev);
465 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
467 int dev = class_uuid2dev(uuid);
470 return class_num2obd(dev);
472 EXPORT_SYMBOL(class_uuid2obd);
475 * Get obd device from ::obd_devs[]
477 * \param num [in] array index
479 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
480 * otherwise return the obd device there.
482 struct obd_device *class_num2obd(int num)
484 struct obd_device *obd = NULL;
486 if (num < class_devno_max()) {
491 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
492 "%p obd_magic %08x != %08x\n",
493 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
494 LASSERTF(obd->obd_minor == num,
495 "%p obd_minor %0d != %0d\n",
496 obd, obd->obd_minor, num);
501 EXPORT_SYMBOL(class_num2obd);
504 * Get obd devices count. Device in any
506 * \retval obd device count
508 int get_devices_count(void)
510 int index, max_index = class_devno_max(), dev_count = 0;
512 read_lock(&obd_dev_lock);
513 for (index = 0; index <= max_index; index++) {
514 struct obd_device *obd = class_num2obd(index);
518 read_unlock(&obd_dev_lock);
522 EXPORT_SYMBOL(get_devices_count);
524 void class_obd_list(void)
529 read_lock(&obd_dev_lock);
530 for (i = 0; i < class_devno_max(); i++) {
531 struct obd_device *obd = class_num2obd(i);
535 if (obd->obd_stopping)
537 else if (obd->obd_set_up)
539 else if (obd->obd_attached)
543 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
544 i, status, obd->obd_type->typ_name,
545 obd->obd_name, obd->obd_uuid.uuid,
546 atomic_read(&obd->obd_refcount));
548 read_unlock(&obd_dev_lock);
552 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
553 specified, then only the client with that uuid is returned,
554 otherwise any client connected to the tgt is returned. */
555 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
556 const char * typ_name,
557 struct obd_uuid *grp_uuid)
561 read_lock(&obd_dev_lock);
562 for (i = 0; i < class_devno_max(); i++) {
563 struct obd_device *obd = class_num2obd(i);
567 if ((strncmp(obd->obd_type->typ_name, typ_name,
568 strlen(typ_name)) == 0)) {
569 if (obd_uuid_equals(tgt_uuid,
570 &obd->u.cli.cl_target_uuid) &&
571 ((grp_uuid)? obd_uuid_equals(grp_uuid,
572 &obd->obd_uuid) : 1)) {
573 read_unlock(&obd_dev_lock);
578 read_unlock(&obd_dev_lock);
582 EXPORT_SYMBOL(class_find_client_obd);
584 /* Iterate the obd_device list looking devices have grp_uuid. Start
585 searching at *next, and if a device is found, the next index to look
586 at is saved in *next. If next is NULL, then the first matching device
587 will always be returned. */
588 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
594 else if (*next >= 0 && *next < class_devno_max())
599 read_lock(&obd_dev_lock);
600 for (; i < class_devno_max(); i++) {
601 struct obd_device *obd = class_num2obd(i);
605 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
608 read_unlock(&obd_dev_lock);
612 read_unlock(&obd_dev_lock);
616 EXPORT_SYMBOL(class_devices_in_group);
619 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
620 * adjust sptlrpc settings accordingly.
622 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
624 struct obd_device *obd;
628 LASSERT(namelen > 0);
630 read_lock(&obd_dev_lock);
631 for (i = 0; i < class_devno_max(); i++) {
632 obd = class_num2obd(i);
634 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
637 /* only notify mdc, osc, mdt, ost */
638 type = obd->obd_type->typ_name;
639 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
640 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
641 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
642 strcmp(type, LUSTRE_OST_NAME) != 0)
645 if (strncmp(obd->obd_name, fsname, namelen))
648 class_incref(obd, __FUNCTION__, obd);
649 read_unlock(&obd_dev_lock);
650 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
651 sizeof(KEY_SPTLRPC_CONF),
652 KEY_SPTLRPC_CONF, 0, NULL, NULL);
654 class_decref(obd, __FUNCTION__, obd);
655 read_lock(&obd_dev_lock);
657 read_unlock(&obd_dev_lock);
660 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
662 void obd_cleanup_caches(void)
665 if (obd_device_cachep) {
666 kmem_cache_destroy(obd_device_cachep);
667 obd_device_cachep = NULL;
670 kmem_cache_destroy(obdo_cachep);
674 kmem_cache_destroy(import_cachep);
675 import_cachep = NULL;
678 kmem_cache_destroy(capa_cachep);
684 int obd_init_caches(void)
689 LASSERT(obd_device_cachep == NULL);
690 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
691 sizeof(struct obd_device),
693 if (!obd_device_cachep)
694 GOTO(out, rc = -ENOMEM);
696 LASSERT(obdo_cachep == NULL);
697 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
700 GOTO(out, rc = -ENOMEM);
702 LASSERT(import_cachep == NULL);
703 import_cachep = kmem_cache_create("ll_import_cache",
704 sizeof(struct obd_import),
707 GOTO(out, rc = -ENOMEM);
709 LASSERT(capa_cachep == NULL);
710 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
713 GOTO(out, rc = -ENOMEM);
717 obd_cleanup_caches();
721 /* map connection to client */
722 struct obd_export *class_conn2export(struct lustre_handle *conn)
724 struct obd_export *export;
728 CDEBUG(D_CACHE, "looking for null handle\n");
732 if (conn->cookie == -1) { /* this means assign a new connection */
733 CDEBUG(D_CACHE, "want a new connection\n");
737 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
738 export = class_handle2object(conn->cookie, NULL);
741 EXPORT_SYMBOL(class_conn2export);
743 struct obd_device *class_exp2obd(struct obd_export *exp)
749 EXPORT_SYMBOL(class_exp2obd);
751 struct obd_device *class_conn2obd(struct lustre_handle *conn)
753 struct obd_export *export;
754 export = class_conn2export(conn);
756 struct obd_device *obd = export->exp_obd;
757 class_export_put(export);
762 EXPORT_SYMBOL(class_conn2obd);
764 struct obd_import *class_exp2cliimp(struct obd_export *exp)
766 struct obd_device *obd = exp->exp_obd;
769 return obd->u.cli.cl_import;
771 EXPORT_SYMBOL(class_exp2cliimp);
773 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
775 struct obd_device *obd = class_conn2obd(conn);
778 return obd->u.cli.cl_import;
780 EXPORT_SYMBOL(class_conn2cliimp);
782 /* Export management functions */
783 static void class_export_destroy(struct obd_export *exp)
785 struct obd_device *obd = exp->exp_obd;
788 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
789 LASSERT(obd != NULL);
791 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
792 exp->exp_client_uuid.uuid, obd->obd_name);
794 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
795 if (exp->exp_connection)
796 ptlrpc_put_connection_superhack(exp->exp_connection);
798 LASSERT(list_empty(&exp->exp_outstanding_replies));
799 LASSERT(list_empty(&exp->exp_uncommitted_replies));
800 LASSERT(list_empty(&exp->exp_req_replay_queue));
801 LASSERT(list_empty(&exp->exp_hp_rpcs));
802 obd_destroy_export(exp);
803 class_decref(obd, "export", exp);
805 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
809 static void export_handle_addref(void *export)
811 class_export_get(export);
814 static struct portals_handle_ops export_handle_ops = {
815 .hop_addref = export_handle_addref,
819 struct obd_export *class_export_get(struct obd_export *exp)
821 atomic_inc(&exp->exp_refcount);
822 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
823 atomic_read(&exp->exp_refcount));
826 EXPORT_SYMBOL(class_export_get);
828 void class_export_put(struct obd_export *exp)
830 LASSERT(exp != NULL);
831 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
832 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
833 atomic_read(&exp->exp_refcount) - 1);
835 if (atomic_dec_and_test(&exp->exp_refcount)) {
836 LASSERT(!list_empty(&exp->exp_obd_chain));
837 CDEBUG(D_IOCTL, "final put %p/%s\n",
838 exp, exp->exp_client_uuid.uuid);
840 /* release nid stat refererence */
841 lprocfs_exp_cleanup(exp);
843 obd_zombie_export_add(exp);
846 EXPORT_SYMBOL(class_export_put);
848 /* Creates a new export, adds it to the hash table, and returns a
849 * pointer to it. The refcount is 2: one for the hash reference, and
850 * one for the pointer returned by this function. */
851 struct obd_export *class_new_export(struct obd_device *obd,
852 struct obd_uuid *cluuid)
854 struct obd_export *export;
855 cfs_hash_t *hash = NULL;
859 OBD_ALLOC_PTR(export);
861 return ERR_PTR(-ENOMEM);
863 export->exp_conn_cnt = 0;
864 export->exp_lock_hash = NULL;
865 export->exp_flock_hash = NULL;
866 atomic_set(&export->exp_refcount, 2);
867 atomic_set(&export->exp_rpc_count, 0);
868 atomic_set(&export->exp_cb_count, 0);
869 atomic_set(&export->exp_locks_count, 0);
870 #if LUSTRE_TRACKS_LOCK_EXP_REFS
871 INIT_LIST_HEAD(&export->exp_locks_list);
872 spin_lock_init(&export->exp_locks_list_guard);
874 atomic_set(&export->exp_replay_count, 0);
875 export->exp_obd = obd;
876 INIT_LIST_HEAD(&export->exp_outstanding_replies);
877 spin_lock_init(&export->exp_uncommitted_replies_lock);
878 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
879 INIT_LIST_HEAD(&export->exp_req_replay_queue);
880 INIT_LIST_HEAD(&export->exp_handle.h_link);
881 INIT_LIST_HEAD(&export->exp_hp_rpcs);
882 INIT_LIST_HEAD(&export->exp_reg_rpcs);
883 class_handle_hash(&export->exp_handle, &export_handle_ops);
884 export->exp_last_request_time = cfs_time_current_sec();
885 spin_lock_init(&export->exp_lock);
886 spin_lock_init(&export->exp_rpc_lock);
887 INIT_HLIST_NODE(&export->exp_uuid_hash);
888 INIT_HLIST_NODE(&export->exp_nid_hash);
889 spin_lock_init(&export->exp_bl_list_lock);
890 INIT_LIST_HEAD(&export->exp_bl_list);
892 export->exp_sp_peer = LUSTRE_SP_ANY;
893 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
894 export->exp_client_uuid = *cluuid;
895 obd_init_export(export);
897 spin_lock(&obd->obd_dev_lock);
898 /* shouldn't happen, but might race */
899 if (obd->obd_stopping)
900 GOTO(exit_unlock, rc = -ENODEV);
902 hash = cfs_hash_getref(obd->obd_uuid_hash);
904 GOTO(exit_unlock, rc = -ENODEV);
905 spin_unlock(&obd->obd_dev_lock);
907 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
908 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
910 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
911 obd->obd_name, cluuid->uuid, rc);
912 GOTO(exit_err, rc = -EALREADY);
916 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
917 spin_lock(&obd->obd_dev_lock);
918 if (obd->obd_stopping) {
919 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
920 GOTO(exit_unlock, rc = -ENODEV);
923 class_incref(obd, "export", export);
924 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
925 list_add_tail(&export->exp_obd_chain_timed,
926 &export->exp_obd->obd_exports_timed);
927 export->exp_obd->obd_num_exports++;
928 spin_unlock(&obd->obd_dev_lock);
929 cfs_hash_putref(hash);
933 spin_unlock(&obd->obd_dev_lock);
936 cfs_hash_putref(hash);
937 class_handle_unhash(&export->exp_handle);
938 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
939 obd_destroy_export(export);
940 OBD_FREE_PTR(export);
943 EXPORT_SYMBOL(class_new_export);
945 void class_unlink_export(struct obd_export *exp)
947 class_handle_unhash(&exp->exp_handle);
949 spin_lock(&exp->exp_obd->obd_dev_lock);
950 /* delete an uuid-export hashitem from hashtables */
951 if (!hlist_unhashed(&exp->exp_uuid_hash))
952 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
953 &exp->exp_client_uuid,
954 &exp->exp_uuid_hash);
956 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
957 list_del_init(&exp->exp_obd_chain_timed);
958 exp->exp_obd->obd_num_exports--;
959 spin_unlock(&exp->exp_obd->obd_dev_lock);
960 class_export_put(exp);
962 EXPORT_SYMBOL(class_unlink_export);
964 /* Import management functions */
965 void class_import_destroy(struct obd_import *imp)
969 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
970 imp->imp_obd->obd_name);
972 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
974 ptlrpc_put_connection_superhack(imp->imp_connection);
976 while (!list_empty(&imp->imp_conn_list)) {
977 struct obd_import_conn *imp_conn;
979 imp_conn = list_entry(imp->imp_conn_list.next,
980 struct obd_import_conn, oic_item);
981 list_del_init(&imp_conn->oic_item);
982 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
983 OBD_FREE(imp_conn, sizeof(*imp_conn));
986 LASSERT(imp->imp_sec == NULL);
987 class_decref(imp->imp_obd, "import", imp);
988 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
992 static void import_handle_addref(void *import)
994 class_import_get(import);
997 static struct portals_handle_ops import_handle_ops = {
998 .hop_addref = import_handle_addref,
1002 struct obd_import *class_import_get(struct obd_import *import)
1004 atomic_inc(&import->imp_refcount);
1005 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1006 atomic_read(&import->imp_refcount),
1007 import->imp_obd->obd_name);
1010 EXPORT_SYMBOL(class_import_get);
1012 void class_import_put(struct obd_import *imp)
1016 LASSERT(list_empty(&imp->imp_zombie_chain));
1017 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1019 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1020 atomic_read(&imp->imp_refcount) - 1,
1021 imp->imp_obd->obd_name);
1023 if (atomic_dec_and_test(&imp->imp_refcount)) {
1024 CDEBUG(D_INFO, "final put import %p\n", imp);
1025 obd_zombie_import_add(imp);
1028 /* catch possible import put race */
1029 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1032 EXPORT_SYMBOL(class_import_put);
1034 static void init_imp_at(struct imp_at *at) {
1036 at_init(&at->iat_net_latency, 0, 0);
1037 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1038 /* max service estimates are tracked on the server side, so
1039 don't use the AT history here, just use the last reported
1040 val. (But keep hist for proc histogram, worst_ever) */
1041 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1046 struct obd_import *class_new_import(struct obd_device *obd)
1048 struct obd_import *imp;
1050 OBD_ALLOC(imp, sizeof(*imp));
1054 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1055 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1056 INIT_LIST_HEAD(&imp->imp_replay_list);
1057 INIT_LIST_HEAD(&imp->imp_sending_list);
1058 INIT_LIST_HEAD(&imp->imp_delayed_list);
1059 INIT_LIST_HEAD(&imp->imp_committed_list);
1060 imp->imp_replay_cursor = &imp->imp_committed_list;
1061 spin_lock_init(&imp->imp_lock);
1062 imp->imp_last_success_conn = 0;
1063 imp->imp_state = LUSTRE_IMP_NEW;
1064 imp->imp_obd = class_incref(obd, "import", imp);
1065 mutex_init(&imp->imp_sec_mutex);
1066 init_waitqueue_head(&imp->imp_recovery_waitq);
1068 atomic_set(&imp->imp_refcount, 2);
1069 atomic_set(&imp->imp_unregistering, 0);
1070 atomic_set(&imp->imp_inflight, 0);
1071 atomic_set(&imp->imp_replay_inflight, 0);
1072 atomic_set(&imp->imp_inval_count, 0);
1073 INIT_LIST_HEAD(&imp->imp_conn_list);
1074 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1075 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1076 init_imp_at(&imp->imp_at);
1078 /* the default magic is V2, will be used in connect RPC, and
1079 * then adjusted according to the flags in request/reply. */
1080 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1084 EXPORT_SYMBOL(class_new_import);
1086 void class_destroy_import(struct obd_import *import)
1088 LASSERT(import != NULL);
1089 LASSERT(import != LP_POISON);
1091 class_handle_unhash(&import->imp_handle);
1093 spin_lock(&import->imp_lock);
1094 import->imp_generation++;
1095 spin_unlock(&import->imp_lock);
1096 class_import_put(import);
1098 EXPORT_SYMBOL(class_destroy_import);
1100 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1102 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1104 spin_lock(&exp->exp_locks_list_guard);
1106 LASSERT(lock->l_exp_refs_nr >= 0);
1108 if (lock->l_exp_refs_target != NULL &&
1109 lock->l_exp_refs_target != exp) {
1110 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1111 exp, lock, lock->l_exp_refs_target);
1113 if ((lock->l_exp_refs_nr ++) == 0) {
1114 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1115 lock->l_exp_refs_target = exp;
1117 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1118 lock, exp, lock->l_exp_refs_nr);
1119 spin_unlock(&exp->exp_locks_list_guard);
1121 EXPORT_SYMBOL(__class_export_add_lock_ref);
1123 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1125 spin_lock(&exp->exp_locks_list_guard);
1126 LASSERT(lock->l_exp_refs_nr > 0);
1127 if (lock->l_exp_refs_target != exp) {
1128 LCONSOLE_WARN("lock %p, "
1129 "mismatching export pointers: %p, %p\n",
1130 lock, lock->l_exp_refs_target, exp);
1132 if (-- lock->l_exp_refs_nr == 0) {
1133 list_del_init(&lock->l_exp_refs_link);
1134 lock->l_exp_refs_target = NULL;
1136 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1137 lock, exp, lock->l_exp_refs_nr);
1138 spin_unlock(&exp->exp_locks_list_guard);
1140 EXPORT_SYMBOL(__class_export_del_lock_ref);
1143 /* A connection defines an export context in which preallocation can
1144 be managed. This releases the export pointer reference, and returns
1145 the export handle, so the export refcount is 1 when this function
1147 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1148 struct obd_uuid *cluuid)
1150 struct obd_export *export;
1151 LASSERT(conn != NULL);
1152 LASSERT(obd != NULL);
1153 LASSERT(cluuid != NULL);
1156 export = class_new_export(obd, cluuid);
1158 RETURN(PTR_ERR(export));
1160 conn->cookie = export->exp_handle.h_cookie;
1161 class_export_put(export);
1163 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1164 cluuid->uuid, conn->cookie);
1167 EXPORT_SYMBOL(class_connect);
1169 /* if export is involved in recovery then clean up related things */
1170 void class_export_recovery_cleanup(struct obd_export *exp)
1172 struct obd_device *obd = exp->exp_obd;
1174 spin_lock(&obd->obd_recovery_task_lock);
1175 if (obd->obd_recovering) {
1176 if (exp->exp_in_recovery) {
1177 spin_lock(&exp->exp_lock);
1178 exp->exp_in_recovery = 0;
1179 spin_unlock(&exp->exp_lock);
1180 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1181 atomic_dec(&obd->obd_connected_clients);
1184 /* if called during recovery then should update
1185 * obd_stale_clients counter,
1186 * lightweight exports are not counted */
1187 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1188 exp->exp_obd->obd_stale_clients++;
1190 spin_unlock(&obd->obd_recovery_task_lock);
1192 spin_lock(&exp->exp_lock);
1193 /** Cleanup req replay fields */
1194 if (exp->exp_req_replay_needed) {
1195 exp->exp_req_replay_needed = 0;
1197 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1198 atomic_dec(&obd->obd_req_replay_clients);
1201 /** Cleanup lock replay data */
1202 if (exp->exp_lock_replay_needed) {
1203 exp->exp_lock_replay_needed = 0;
1205 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1206 atomic_dec(&obd->obd_lock_replay_clients);
1208 spin_unlock(&exp->exp_lock);
1211 /* This function removes 1-3 references from the export:
1212 * 1 - for export pointer passed
1213 * and if disconnect really need
1214 * 2 - removing from hash
1215 * 3 - in client_unlink_export
1216 * The export pointer passed to this function can destroyed */
1217 int class_disconnect(struct obd_export *export)
1219 int already_disconnected;
1222 if (export == NULL) {
1223 CWARN("attempting to free NULL export %p\n", export);
1227 spin_lock(&export->exp_lock);
1228 already_disconnected = export->exp_disconnected;
1229 export->exp_disconnected = 1;
1230 spin_unlock(&export->exp_lock);
1232 /* class_cleanup(), abort_recovery(), and class_fail_export()
1233 * all end up in here, and if any of them race we shouldn't
1234 * call extra class_export_puts(). */
1235 if (already_disconnected) {
1236 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1237 GOTO(no_disconn, already_disconnected);
1240 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1241 export->exp_handle.h_cookie);
1243 if (!hlist_unhashed(&export->exp_nid_hash))
1244 cfs_hash_del(export->exp_obd->obd_nid_hash,
1245 &export->exp_connection->c_peer.nid,
1246 &export->exp_nid_hash);
1248 class_export_recovery_cleanup(export);
1249 class_unlink_export(export);
1251 class_export_put(export);
1254 EXPORT_SYMBOL(class_disconnect);
1256 /* Return non-zero for a fully connected export */
1257 int class_connected_export(struct obd_export *exp)
1262 spin_lock(&exp->exp_lock);
1263 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1264 spin_unlock(&exp->exp_lock);
1268 EXPORT_SYMBOL(class_connected_export);
1270 static void class_disconnect_export_list(struct list_head *list,
1271 enum obd_option flags)
1274 struct obd_export *exp;
1277 /* It's possible that an export may disconnect itself, but
1278 * nothing else will be added to this list. */
1279 while (!list_empty(list)) {
1280 exp = list_entry(list->next, struct obd_export,
1282 /* need for safe call CDEBUG after obd_disconnect */
1283 class_export_get(exp);
1285 spin_lock(&exp->exp_lock);
1286 exp->exp_flags = flags;
1287 spin_unlock(&exp->exp_lock);
1289 if (obd_uuid_equals(&exp->exp_client_uuid,
1290 &exp->exp_obd->obd_uuid)) {
1292 "exp %p export uuid == obd uuid, don't discon\n",
1294 /* Need to delete this now so we don't end up pointing
1295 * to work_list later when this export is cleaned up. */
1296 list_del_init(&exp->exp_obd_chain);
1297 class_export_put(exp);
1301 class_export_get(exp);
1302 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1303 "last request at "CFS_TIME_T"\n",
1304 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1305 exp, exp->exp_last_request_time);
1306 /* release one export reference anyway */
1307 rc = obd_disconnect(exp);
1309 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1310 obd_export_nid2str(exp), exp, rc);
1311 class_export_put(exp);
1316 void class_disconnect_exports(struct obd_device *obd)
1318 struct list_head work_list;
1321 /* Move all of the exports from obd_exports to a work list, en masse. */
1322 INIT_LIST_HEAD(&work_list);
1323 spin_lock(&obd->obd_dev_lock);
1324 list_splice_init(&obd->obd_exports, &work_list);
1325 list_splice_init(&obd->obd_delayed_exports, &work_list);
1326 spin_unlock(&obd->obd_dev_lock);
1328 if (!list_empty(&work_list)) {
1329 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1330 "disconnecting them\n", obd->obd_minor, obd);
1331 class_disconnect_export_list(&work_list,
1332 exp_flags_from_obd(obd));
1334 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1335 obd->obd_minor, obd);
1338 EXPORT_SYMBOL(class_disconnect_exports);
1340 /* Remove exports that have not completed recovery.
1342 void class_disconnect_stale_exports(struct obd_device *obd,
1343 int (*test_export)(struct obd_export *))
1345 struct list_head work_list;
1346 struct obd_export *exp, *n;
1350 INIT_LIST_HEAD(&work_list);
1351 spin_lock(&obd->obd_dev_lock);
1352 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1354 /* don't count self-export as client */
1355 if (obd_uuid_equals(&exp->exp_client_uuid,
1356 &exp->exp_obd->obd_uuid))
1359 /* don't evict clients which have no slot in last_rcvd
1360 * (e.g. lightweight connection) */
1361 if (exp->exp_target_data.ted_lr_idx == -1)
1364 spin_lock(&exp->exp_lock);
1365 if (exp->exp_failed || test_export(exp)) {
1366 spin_unlock(&exp->exp_lock);
1369 exp->exp_failed = 1;
1370 spin_unlock(&exp->exp_lock);
1372 list_move(&exp->exp_obd_chain, &work_list);
1374 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1375 obd->obd_name, exp->exp_client_uuid.uuid,
1376 exp->exp_connection == NULL ? "<unknown>" :
1377 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1378 print_export_data(exp, "EVICTING", 0);
1380 spin_unlock(&obd->obd_dev_lock);
1383 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1384 obd->obd_name, evicted);
1386 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1387 OBD_OPT_ABORT_RECOV);
1390 EXPORT_SYMBOL(class_disconnect_stale_exports);
1392 void class_fail_export(struct obd_export *exp)
1394 int rc, already_failed;
1396 spin_lock(&exp->exp_lock);
1397 already_failed = exp->exp_failed;
1398 exp->exp_failed = 1;
1399 spin_unlock(&exp->exp_lock);
1401 if (already_failed) {
1402 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1403 exp, exp->exp_client_uuid.uuid);
1407 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1408 exp, exp->exp_client_uuid.uuid);
1410 if (obd_dump_on_timeout)
1411 libcfs_debug_dumplog();
1413 /* need for safe call CDEBUG after obd_disconnect */
1414 class_export_get(exp);
1416 /* Most callers into obd_disconnect are removing their own reference
1417 * (request, for example) in addition to the one from the hash table.
1418 * We don't have such a reference here, so make one. */
1419 class_export_get(exp);
1420 rc = obd_disconnect(exp);
1422 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1424 CDEBUG(D_HA, "disconnected export %p/%s\n",
1425 exp, exp->exp_client_uuid.uuid);
1426 class_export_put(exp);
1428 EXPORT_SYMBOL(class_fail_export);
1430 char *obd_export_nid2str(struct obd_export *exp)
1432 if (exp->exp_connection != NULL)
1433 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1437 EXPORT_SYMBOL(obd_export_nid2str);
1439 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1441 cfs_hash_t *nid_hash;
1442 struct obd_export *doomed_exp = NULL;
1443 int exports_evicted = 0;
1445 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1447 spin_lock(&obd->obd_dev_lock);
1448 /* umount has run already, so evict thread should leave
1449 * its task to umount thread now */
1450 if (obd->obd_stopping) {
1451 spin_unlock(&obd->obd_dev_lock);
1452 return exports_evicted;
1454 nid_hash = obd->obd_nid_hash;
1455 cfs_hash_getref(nid_hash);
1456 spin_unlock(&obd->obd_dev_lock);
1459 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1460 if (doomed_exp == NULL)
1463 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1464 "nid %s found, wanted nid %s, requested nid %s\n",
1465 obd_export_nid2str(doomed_exp),
1466 libcfs_nid2str(nid_key), nid);
1467 LASSERTF(doomed_exp != obd->obd_self_export,
1468 "self-export is hashed by NID?\n");
1470 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1471 "request\n", obd->obd_name,
1472 obd_uuid2str(&doomed_exp->exp_client_uuid),
1473 obd_export_nid2str(doomed_exp));
1474 class_fail_export(doomed_exp);
1475 class_export_put(doomed_exp);
1478 cfs_hash_putref(nid_hash);
1480 if (!exports_evicted)
1481 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1482 obd->obd_name, nid);
1483 return exports_evicted;
1485 EXPORT_SYMBOL(obd_export_evict_by_nid);
1487 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1489 cfs_hash_t *uuid_hash;
1490 struct obd_export *doomed_exp = NULL;
1491 struct obd_uuid doomed_uuid;
1492 int exports_evicted = 0;
1494 spin_lock(&obd->obd_dev_lock);
1495 if (obd->obd_stopping) {
1496 spin_unlock(&obd->obd_dev_lock);
1497 return exports_evicted;
1499 uuid_hash = obd->obd_uuid_hash;
1500 cfs_hash_getref(uuid_hash);
1501 spin_unlock(&obd->obd_dev_lock);
1503 obd_str2uuid(&doomed_uuid, uuid);
1504 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1505 CERROR("%s: can't evict myself\n", obd->obd_name);
1506 cfs_hash_putref(uuid_hash);
1507 return exports_evicted;
1510 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1512 if (doomed_exp == NULL) {
1513 CERROR("%s: can't disconnect %s: no exports found\n",
1514 obd->obd_name, uuid);
1516 CWARN("%s: evicting %s at adminstrative request\n",
1517 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1518 class_fail_export(doomed_exp);
1519 class_export_put(doomed_exp);
1522 cfs_hash_putref(uuid_hash);
1524 return exports_evicted;
1526 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1528 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1529 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1530 EXPORT_SYMBOL(class_export_dump_hook);
1533 static void print_export_data(struct obd_export *exp, const char *status,
1536 struct ptlrpc_reply_state *rs;
1537 struct ptlrpc_reply_state *first_reply = NULL;
1540 spin_lock(&exp->exp_lock);
1541 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1547 spin_unlock(&exp->exp_lock);
1549 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1550 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1551 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1552 atomic_read(&exp->exp_rpc_count),
1553 atomic_read(&exp->exp_cb_count),
1554 atomic_read(&exp->exp_locks_count),
1555 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1556 nreplies, first_reply, nreplies > 3 ? "..." : "",
1557 exp->exp_last_committed);
1558 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1559 if (locks && class_export_dump_hook != NULL)
1560 class_export_dump_hook(exp);
1564 void dump_exports(struct obd_device *obd, int locks)
1566 struct obd_export *exp;
1568 spin_lock(&obd->obd_dev_lock);
1569 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1570 print_export_data(exp, "ACTIVE", locks);
1571 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1572 print_export_data(exp, "UNLINKED", locks);
1573 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1574 print_export_data(exp, "DELAYED", locks);
1575 spin_unlock(&obd->obd_dev_lock);
1576 spin_lock(&obd_zombie_impexp_lock);
1577 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1578 print_export_data(exp, "ZOMBIE", locks);
1579 spin_unlock(&obd_zombie_impexp_lock);
1581 EXPORT_SYMBOL(dump_exports);
1583 void obd_exports_barrier(struct obd_device *obd)
1586 LASSERT(list_empty(&obd->obd_exports));
1587 spin_lock(&obd->obd_dev_lock);
1588 while (!list_empty(&obd->obd_unlinked_exports)) {
1589 spin_unlock(&obd->obd_dev_lock);
1590 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1591 cfs_time_seconds(waited));
1592 if (waited > 5 && IS_PO2(waited)) {
1593 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1594 "more than %d seconds. "
1595 "The obd refcount = %d. Is it stuck?\n",
1596 obd->obd_name, waited,
1597 atomic_read(&obd->obd_refcount));
1598 dump_exports(obd, 1);
1601 spin_lock(&obd->obd_dev_lock);
1603 spin_unlock(&obd->obd_dev_lock);
1605 EXPORT_SYMBOL(obd_exports_barrier);
1607 /* Total amount of zombies to be destroyed */
1608 static int zombies_count = 0;
1611 * kill zombie imports and exports
1613 void obd_zombie_impexp_cull(void)
1615 struct obd_import *import;
1616 struct obd_export *export;
1620 spin_lock(&obd_zombie_impexp_lock);
1623 if (!list_empty(&obd_zombie_imports)) {
1624 import = list_entry(obd_zombie_imports.next,
1627 list_del_init(&import->imp_zombie_chain);
1631 if (!list_empty(&obd_zombie_exports)) {
1632 export = list_entry(obd_zombie_exports.next,
1635 list_del_init(&export->exp_obd_chain);
1638 spin_unlock(&obd_zombie_impexp_lock);
1640 if (import != NULL) {
1641 class_import_destroy(import);
1642 spin_lock(&obd_zombie_impexp_lock);
1644 spin_unlock(&obd_zombie_impexp_lock);
1647 if (export != NULL) {
1648 class_export_destroy(export);
1649 spin_lock(&obd_zombie_impexp_lock);
1651 spin_unlock(&obd_zombie_impexp_lock);
1655 } while (import != NULL || export != NULL);
1659 static struct completion obd_zombie_start;
1660 static struct completion obd_zombie_stop;
1661 static unsigned long obd_zombie_flags;
1662 static wait_queue_head_t obd_zombie_waitq;
1663 static pid_t obd_zombie_pid;
1666 OBD_ZOMBIE_STOP = 0x0001,
1670 * check for work for kill zombie import/export thread.
1672 static int obd_zombie_impexp_check(void *arg)
1676 spin_lock(&obd_zombie_impexp_lock);
1677 rc = (zombies_count == 0) &&
1678 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1679 spin_unlock(&obd_zombie_impexp_lock);
1685 * Add export to the obd_zombe thread and notify it.
1687 static void obd_zombie_export_add(struct obd_export *exp) {
1688 spin_lock(&exp->exp_obd->obd_dev_lock);
1689 LASSERT(!list_empty(&exp->exp_obd_chain));
1690 list_del_init(&exp->exp_obd_chain);
1691 spin_unlock(&exp->exp_obd->obd_dev_lock);
1692 spin_lock(&obd_zombie_impexp_lock);
1694 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1695 spin_unlock(&obd_zombie_impexp_lock);
1697 obd_zombie_impexp_notify();
1701 * Add import to the obd_zombe thread and notify it.
1703 static void obd_zombie_import_add(struct obd_import *imp) {
1704 LASSERT(imp->imp_sec == NULL);
1705 LASSERT(imp->imp_rq_pool == NULL);
1706 spin_lock(&obd_zombie_impexp_lock);
1707 LASSERT(list_empty(&imp->imp_zombie_chain));
1709 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1710 spin_unlock(&obd_zombie_impexp_lock);
1712 obd_zombie_impexp_notify();
1716 * notify import/export destroy thread about new zombie.
1718 static void obd_zombie_impexp_notify(void)
1721 * Make sure obd_zomebie_impexp_thread get this notification.
1722 * It is possible this signal only get by obd_zombie_barrier, and
1723 * barrier gulps this notification and sleeps away and hangs ensues
1725 wake_up_all(&obd_zombie_waitq);
1729 * check whether obd_zombie is idle
1731 static int obd_zombie_is_idle(void)
1735 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1736 spin_lock(&obd_zombie_impexp_lock);
1737 rc = (zombies_count == 0);
1738 spin_unlock(&obd_zombie_impexp_lock);
1743 * wait when obd_zombie import/export queues become empty
1745 void obd_zombie_barrier(void)
1747 struct l_wait_info lwi = { 0 };
1749 if (obd_zombie_pid == current_pid())
1750 /* don't wait for myself */
1752 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1754 EXPORT_SYMBOL(obd_zombie_barrier);
1758 * destroy zombie export/import thread.
1760 static int obd_zombie_impexp_thread(void *unused)
1762 unshare_fs_struct();
1763 complete(&obd_zombie_start);
1765 obd_zombie_pid = current_pid();
1767 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1768 struct l_wait_info lwi = { 0 };
1770 l_wait_event(obd_zombie_waitq,
1771 !obd_zombie_impexp_check(NULL), &lwi);
1772 obd_zombie_impexp_cull();
1775 * Notify obd_zombie_barrier callers that queues
1778 wake_up(&obd_zombie_waitq);
1781 complete(&obd_zombie_stop);
1788 * start destroy zombie import/export thread
1790 int obd_zombie_impexp_init(void)
1792 struct task_struct *task;
1794 INIT_LIST_HEAD(&obd_zombie_imports);
1796 INIT_LIST_HEAD(&obd_zombie_exports);
1797 spin_lock_init(&obd_zombie_impexp_lock);
1798 init_completion(&obd_zombie_start);
1799 init_completion(&obd_zombie_stop);
1800 init_waitqueue_head(&obd_zombie_waitq);
1803 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1805 RETURN(PTR_ERR(task));
1807 wait_for_completion(&obd_zombie_start);
1811 * stop destroy zombie import/export thread
1813 void obd_zombie_impexp_stop(void)
1815 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1816 obd_zombie_impexp_notify();
1817 wait_for_completion(&obd_zombie_stop);
1820 /***** Kernel-userspace comm helpers *******/
1822 /* Get length of entire message, including header */
1823 int kuc_len(int payload_len)
1825 return sizeof(struct kuc_hdr) + payload_len;
1827 EXPORT_SYMBOL(kuc_len);
1829 /* Get a pointer to kuc header, given a ptr to the payload
1830 * @param p Pointer to payload area
1831 * @returns Pointer to kuc header
1833 struct kuc_hdr * kuc_ptr(void *p)
1835 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1836 LASSERT(lh->kuc_magic == KUC_MAGIC);
1839 EXPORT_SYMBOL(kuc_ptr);
1841 /* Test if payload is part of kuc message
1842 * @param p Pointer to payload area
1845 int kuc_ispayload(void *p)
1847 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1849 if (kh->kuc_magic == KUC_MAGIC)
1854 EXPORT_SYMBOL(kuc_ispayload);
1856 /* Alloc space for a message, and fill in header
1857 * @return Pointer to payload area
1859 void *kuc_alloc(int payload_len, int transport, int type)
1862 int len = kuc_len(payload_len);
1866 return ERR_PTR(-ENOMEM);
1868 lh->kuc_magic = KUC_MAGIC;
1869 lh->kuc_transport = transport;
1870 lh->kuc_msgtype = type;
1871 lh->kuc_msglen = len;
1873 return (void *)(lh + 1);
1875 EXPORT_SYMBOL(kuc_alloc);
1877 /* Takes pointer to payload area */
1878 inline void kuc_free(void *p, int payload_len)
1880 struct kuc_hdr *lh = kuc_ptr(p);
1881 OBD_FREE(lh, kuc_len(payload_len));
1883 EXPORT_SYMBOL(kuc_free);
1885 struct obd_request_slot_waiter {
1886 struct list_head orsw_entry;
1887 wait_queue_head_t orsw_waitq;
1891 static bool obd_request_slot_avail(struct client_obd *cli,
1892 struct obd_request_slot_waiter *orsw)
1896 client_obd_list_lock(&cli->cl_loi_list_lock);
1897 avail = !!list_empty(&orsw->orsw_entry);
1898 client_obd_list_unlock(&cli->cl_loi_list_lock);
1904 * For network flow control, the RPC sponsor needs to acquire a credit
1905 * before sending the RPC. The credits count for a connection is defined
1906 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1907 * the subsequent RPC sponsors need to wait until others released their
1908 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1910 int obd_get_request_slot(struct client_obd *cli)
1912 struct obd_request_slot_waiter orsw;
1913 struct l_wait_info lwi;
1916 client_obd_list_lock(&cli->cl_loi_list_lock);
1917 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1918 cli->cl_r_in_flight++;
1919 client_obd_list_unlock(&cli->cl_loi_list_lock);
1923 init_waitqueue_head(&orsw.orsw_waitq);
1924 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1925 orsw.orsw_signaled = false;
1926 client_obd_list_unlock(&cli->cl_loi_list_lock);
1928 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1929 rc = l_wait_event(orsw.orsw_waitq,
1930 obd_request_slot_avail(cli, &orsw) ||
1934 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1935 * freed but other (such as obd_put_request_slot) is using it. */
1936 client_obd_list_lock(&cli->cl_loi_list_lock);
1938 if (!orsw.orsw_signaled) {
1939 if (list_empty(&orsw.orsw_entry))
1940 cli->cl_r_in_flight--;
1942 list_del(&orsw.orsw_entry);
1946 if (orsw.orsw_signaled) {
1947 LASSERT(list_empty(&orsw.orsw_entry));
1951 client_obd_list_unlock(&cli->cl_loi_list_lock);
1955 EXPORT_SYMBOL(obd_get_request_slot);
1957 void obd_put_request_slot(struct client_obd *cli)
1959 struct obd_request_slot_waiter *orsw;
1961 client_obd_list_lock(&cli->cl_loi_list_lock);
1962 cli->cl_r_in_flight--;
1964 /* If there is free slot, wakeup the first waiter. */
1965 if (!list_empty(&cli->cl_loi_read_list) &&
1966 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1967 orsw = list_entry(cli->cl_loi_read_list.next,
1968 struct obd_request_slot_waiter, orsw_entry);
1969 list_del_init(&orsw->orsw_entry);
1970 cli->cl_r_in_flight++;
1971 wake_up(&orsw->orsw_waitq);
1973 client_obd_list_unlock(&cli->cl_loi_list_lock);
1975 EXPORT_SYMBOL(obd_put_request_slot);
1977 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1979 return cli->cl_max_rpcs_in_flight;
1981 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1983 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1985 struct obd_request_slot_waiter *orsw;
1990 if (max > OBD_MAX_RIF_MAX || max < 1)
1993 client_obd_list_lock(&cli->cl_loi_list_lock);
1994 old = cli->cl_max_rpcs_in_flight;
1995 cli->cl_max_rpcs_in_flight = max;
1998 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1999 for (i = 0; i < diff; i++) {
2000 if (list_empty(&cli->cl_loi_read_list))
2003 orsw = list_entry(cli->cl_loi_read_list.next,
2004 struct obd_request_slot_waiter, orsw_entry);
2005 list_del_init(&orsw->orsw_entry);
2006 cli->cl_r_in_flight++;
2007 wake_up(&orsw->orsw_waitq);
2009 client_obd_list_unlock(&cli->cl_loi_list_lock);
2013 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);