4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_class.h>
44 #include <lprocfs_status.h>
46 spinlock_t obd_types_lock;
48 struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 struct kmem_cache *import_cachep;
53 struct list_head obd_zombie_imports;
54 struct list_head obd_zombie_exports;
55 spinlock_t obd_zombie_impexp_lock;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60 const char *status, int locks);
62 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
63 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
66 * support functions: we could use inter-module communication, but this
67 * is more portable to other OS's
69 static struct obd_device *obd_device_alloc(void)
71 struct obd_device *obd;
73 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
75 obd->obd_magic = OBD_DEVICE_MAGIC;
80 static void obd_device_free(struct obd_device *obd)
83 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
84 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
85 if (obd->obd_namespace != NULL) {
86 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
87 obd, obd->obd_namespace, obd->obd_force);
90 lu_ref_fini(&obd->obd_reference);
91 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 struct obd_type *class_search_type(const char *name)
96 struct list_head *tmp;
97 struct obd_type *type;
99 spin_lock(&obd_types_lock);
100 list_for_each(tmp, &obd_types) {
101 type = list_entry(tmp, struct obd_type, typ_chain);
102 if (strcmp(type->typ_name, name) == 0) {
103 spin_unlock(&obd_types_lock);
107 spin_unlock(&obd_types_lock);
110 EXPORT_SYMBOL(class_search_type);
112 struct obd_type *class_get_type(const char *name)
114 struct obd_type *type = class_search_type(name);
116 #ifdef HAVE_MODULE_LOADING_SUPPORT
118 const char *modname = name;
120 if (strcmp(modname, "obdfilter") == 0)
123 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
124 modname = LUSTRE_OSP_NAME;
126 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
127 modname = LUSTRE_MDT_NAME;
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 spin_lock(&type->obd_type_lock);
141 try_module_get(type->typ_dt_ops->o_owner);
142 spin_unlock(&type->obd_type_lock);
146 EXPORT_SYMBOL(class_get_type);
148 void class_put_type(struct obd_type *type)
151 spin_lock(&type->obd_type_lock);
153 module_put(type->typ_dt_ops->o_owner);
154 spin_unlock(&type->obd_type_lock);
156 EXPORT_SYMBOL(class_put_type);
158 #define CLASS_MAX_NAME 1024
160 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
161 bool enable_proc, struct lprocfs_vars *vars,
162 const char *name, struct lu_device_type *ldt)
164 struct obd_type *type;
169 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
171 if (class_search_type(name)) {
172 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
177 OBD_ALLOC(type, sizeof(*type));
181 OBD_ALLOC_PTR(type->typ_dt_ops);
182 OBD_ALLOC_PTR(type->typ_md_ops);
183 OBD_ALLOC(type->typ_name, strlen(name) + 1);
185 if (type->typ_dt_ops == NULL ||
186 type->typ_md_ops == NULL ||
187 type->typ_name == NULL)
190 *(type->typ_dt_ops) = *dt_ops;
191 /* md_ops is optional */
193 *(type->typ_md_ops) = *md_ops;
194 strcpy(type->typ_name, name);
195 spin_lock_init(&type->obd_type_lock);
199 type->typ_procroot = lprocfs_register(type->typ_name,
202 if (IS_ERR(type->typ_procroot)) {
203 rc = PTR_ERR(type->typ_procroot);
204 type->typ_procroot = NULL;
211 rc = lu_device_type_init(ldt);
216 spin_lock(&obd_types_lock);
217 list_add(&type->typ_chain, &obd_types);
218 spin_unlock(&obd_types_lock);
223 if (type->typ_name != NULL) {
225 if (type->typ_procroot != NULL)
226 remove_proc_subtree(type->typ_name, proc_lustre_root);
228 OBD_FREE(type->typ_name, strlen(name) + 1);
230 if (type->typ_md_ops != NULL)
231 OBD_FREE_PTR(type->typ_md_ops);
232 if (type->typ_dt_ops != NULL)
233 OBD_FREE_PTR(type->typ_dt_ops);
234 OBD_FREE(type, sizeof(*type));
237 EXPORT_SYMBOL(class_register_type);
239 int class_unregister_type(const char *name)
241 struct obd_type *type = class_search_type(name);
245 CERROR("unknown obd type\n");
249 if (type->typ_refcnt) {
250 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
251 /* This is a bad situation, let's make the best of it */
252 /* Remove ops, but leave the name for debugging */
253 OBD_FREE_PTR(type->typ_dt_ops);
254 OBD_FREE_PTR(type->typ_md_ops);
258 /* we do not use type->typ_procroot as for compatibility purposes
259 * other modules can share names (i.e. lod can use lov entry). so
260 * we can't reference pointer as it can get invalided when another
261 * module removes the entry */
263 if (type->typ_procroot != NULL)
264 remove_proc_subtree(type->typ_name, proc_lustre_root);
265 if (type->typ_procsym != NULL)
266 lprocfs_remove(&type->typ_procsym);
269 lu_device_type_fini(type->typ_lu);
271 spin_lock(&obd_types_lock);
272 list_del(&type->typ_chain);
273 spin_unlock(&obd_types_lock);
274 OBD_FREE(type->typ_name, strlen(name) + 1);
275 if (type->typ_dt_ops != NULL)
276 OBD_FREE_PTR(type->typ_dt_ops);
277 if (type->typ_md_ops != NULL)
278 OBD_FREE_PTR(type->typ_md_ops);
279 OBD_FREE(type, sizeof(*type));
281 } /* class_unregister_type */
282 EXPORT_SYMBOL(class_unregister_type);
285 * Create a new obd device.
287 * Find an empty slot in ::obd_devs[], create a new obd device in it.
289 * \param[in] type_name obd device type string.
290 * \param[in] name obd device name.
292 * \retval NULL if create fails, otherwise return the obd device
295 struct obd_device *class_newdev(const char *type_name, const char *name)
297 struct obd_device *result = NULL;
298 struct obd_device *newdev;
299 struct obd_type *type = NULL;
301 int new_obd_minor = 0;
304 if (strlen(name) >= MAX_OBD_NAME) {
305 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
306 RETURN(ERR_PTR(-EINVAL));
309 type = class_get_type(type_name);
311 CERROR("OBD: unknown type: %s\n", type_name);
312 RETURN(ERR_PTR(-ENODEV));
315 newdev = obd_device_alloc();
317 GOTO(out_type, result = ERR_PTR(-ENOMEM));
319 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
321 write_lock(&obd_dev_lock);
322 for (i = 0; i < class_devno_max(); i++) {
323 struct obd_device *obd = class_num2obd(i);
325 if (obd && (strcmp(name, obd->obd_name) == 0)) {
326 CERROR("Device %s already exists at %d, won't add\n",
329 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
330 "%p obd_magic %08x != %08x\n", result,
331 result->obd_magic, OBD_DEVICE_MAGIC);
332 LASSERTF(result->obd_minor == new_obd_minor,
333 "%p obd_minor %d != %d\n", result,
334 result->obd_minor, new_obd_minor);
336 obd_devs[result->obd_minor] = NULL;
337 result->obd_name[0]='\0';
339 result = ERR_PTR(-EEXIST);
342 if (!result && !obd) {
344 result->obd_minor = i;
346 result->obd_type = type;
347 strncpy(result->obd_name, name,
348 sizeof(result->obd_name) - 1);
349 obd_devs[i] = result;
352 write_unlock(&obd_dev_lock);
354 if (result == NULL && i >= class_devno_max()) {
355 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
357 GOTO(out, result = ERR_PTR(-EOVERFLOW));
363 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
364 result->obd_name, result);
368 obd_device_free(newdev);
370 class_put_type(type);
374 void class_release_dev(struct obd_device *obd)
376 struct obd_type *obd_type = obd->obd_type;
378 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
379 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
380 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
381 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
382 LASSERT(obd_type != NULL);
384 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
385 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
387 write_lock(&obd_dev_lock);
388 obd_devs[obd->obd_minor] = NULL;
389 write_unlock(&obd_dev_lock);
390 obd_device_free(obd);
392 class_put_type(obd_type);
395 int class_name2dev(const char *name)
402 read_lock(&obd_dev_lock);
403 for (i = 0; i < class_devno_max(); i++) {
404 struct obd_device *obd = class_num2obd(i);
406 if (obd && strcmp(name, obd->obd_name) == 0) {
407 /* Make sure we finished attaching before we give
408 out any references */
409 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
410 if (obd->obd_attached) {
411 read_unlock(&obd_dev_lock);
417 read_unlock(&obd_dev_lock);
421 EXPORT_SYMBOL(class_name2dev);
423 struct obd_device *class_name2obd(const char *name)
425 int dev = class_name2dev(name);
427 if (dev < 0 || dev > class_devno_max())
429 return class_num2obd(dev);
431 EXPORT_SYMBOL(class_name2obd);
433 int class_uuid2dev(struct obd_uuid *uuid)
437 read_lock(&obd_dev_lock);
438 for (i = 0; i < class_devno_max(); i++) {
439 struct obd_device *obd = class_num2obd(i);
441 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
442 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
443 read_unlock(&obd_dev_lock);
447 read_unlock(&obd_dev_lock);
451 EXPORT_SYMBOL(class_uuid2dev);
453 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
455 int dev = class_uuid2dev(uuid);
458 return class_num2obd(dev);
460 EXPORT_SYMBOL(class_uuid2obd);
463 * Get obd device from ::obd_devs[]
465 * \param num [in] array index
467 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
468 * otherwise return the obd device there.
470 struct obd_device *class_num2obd(int num)
472 struct obd_device *obd = NULL;
474 if (num < class_devno_max()) {
479 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
480 "%p obd_magic %08x != %08x\n",
481 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
482 LASSERTF(obd->obd_minor == num,
483 "%p obd_minor %0d != %0d\n",
484 obd, obd->obd_minor, num);
489 EXPORT_SYMBOL(class_num2obd);
492 * Get obd devices count. Device in any
494 * \retval obd device count
496 int get_devices_count(void)
498 int index, max_index = class_devno_max(), dev_count = 0;
500 read_lock(&obd_dev_lock);
501 for (index = 0; index <= max_index; index++) {
502 struct obd_device *obd = class_num2obd(index);
506 read_unlock(&obd_dev_lock);
510 EXPORT_SYMBOL(get_devices_count);
512 void class_obd_list(void)
517 read_lock(&obd_dev_lock);
518 for (i = 0; i < class_devno_max(); i++) {
519 struct obd_device *obd = class_num2obd(i);
523 if (obd->obd_stopping)
525 else if (obd->obd_set_up)
527 else if (obd->obd_attached)
531 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
532 i, status, obd->obd_type->typ_name,
533 obd->obd_name, obd->obd_uuid.uuid,
534 atomic_read(&obd->obd_refcount));
536 read_unlock(&obd_dev_lock);
540 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
541 specified, then only the client with that uuid is returned,
542 otherwise any client connected to the tgt is returned. */
543 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
544 const char * typ_name,
545 struct obd_uuid *grp_uuid)
549 read_lock(&obd_dev_lock);
550 for (i = 0; i < class_devno_max(); i++) {
551 struct obd_device *obd = class_num2obd(i);
555 if ((strncmp(obd->obd_type->typ_name, typ_name,
556 strlen(typ_name)) == 0)) {
557 if (obd_uuid_equals(tgt_uuid,
558 &obd->u.cli.cl_target_uuid) &&
559 ((grp_uuid)? obd_uuid_equals(grp_uuid,
560 &obd->obd_uuid) : 1)) {
561 read_unlock(&obd_dev_lock);
566 read_unlock(&obd_dev_lock);
570 EXPORT_SYMBOL(class_find_client_obd);
572 /* Iterate the obd_device list looking devices have grp_uuid. Start
573 searching at *next, and if a device is found, the next index to look
574 at is saved in *next. If next is NULL, then the first matching device
575 will always be returned. */
576 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
582 else if (*next >= 0 && *next < class_devno_max())
587 read_lock(&obd_dev_lock);
588 for (; i < class_devno_max(); i++) {
589 struct obd_device *obd = class_num2obd(i);
593 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
596 read_unlock(&obd_dev_lock);
600 read_unlock(&obd_dev_lock);
604 EXPORT_SYMBOL(class_devices_in_group);
607 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
608 * adjust sptlrpc settings accordingly.
610 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
612 struct obd_device *obd;
616 LASSERT(namelen > 0);
618 read_lock(&obd_dev_lock);
619 for (i = 0; i < class_devno_max(); i++) {
620 obd = class_num2obd(i);
622 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
625 /* only notify mdc, osc, mdt, ost */
626 type = obd->obd_type->typ_name;
627 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
628 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
629 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
630 strcmp(type, LUSTRE_OST_NAME) != 0)
633 if (strncmp(obd->obd_name, fsname, namelen))
636 class_incref(obd, __FUNCTION__, obd);
637 read_unlock(&obd_dev_lock);
638 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
639 sizeof(KEY_SPTLRPC_CONF),
640 KEY_SPTLRPC_CONF, 0, NULL, NULL);
642 class_decref(obd, __FUNCTION__, obd);
643 read_lock(&obd_dev_lock);
645 read_unlock(&obd_dev_lock);
648 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
650 void obd_cleanup_caches(void)
653 if (obd_device_cachep) {
654 kmem_cache_destroy(obd_device_cachep);
655 obd_device_cachep = NULL;
658 kmem_cache_destroy(obdo_cachep);
662 kmem_cache_destroy(import_cachep);
663 import_cachep = NULL;
666 kmem_cache_destroy(capa_cachep);
672 int obd_init_caches(void)
677 LASSERT(obd_device_cachep == NULL);
678 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
679 sizeof(struct obd_device),
681 if (!obd_device_cachep)
682 GOTO(out, rc = -ENOMEM);
684 LASSERT(obdo_cachep == NULL);
685 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
688 GOTO(out, rc = -ENOMEM);
690 LASSERT(import_cachep == NULL);
691 import_cachep = kmem_cache_create("ll_import_cache",
692 sizeof(struct obd_import),
695 GOTO(out, rc = -ENOMEM);
697 LASSERT(capa_cachep == NULL);
698 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
701 GOTO(out, rc = -ENOMEM);
705 obd_cleanup_caches();
709 /* map connection to client */
710 struct obd_export *class_conn2export(struct lustre_handle *conn)
712 struct obd_export *export;
716 CDEBUG(D_CACHE, "looking for null handle\n");
720 if (conn->cookie == -1) { /* this means assign a new connection */
721 CDEBUG(D_CACHE, "want a new connection\n");
725 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
726 export = class_handle2object(conn->cookie, NULL);
729 EXPORT_SYMBOL(class_conn2export);
731 struct obd_device *class_exp2obd(struct obd_export *exp)
737 EXPORT_SYMBOL(class_exp2obd);
739 struct obd_device *class_conn2obd(struct lustre_handle *conn)
741 struct obd_export *export;
742 export = class_conn2export(conn);
744 struct obd_device *obd = export->exp_obd;
745 class_export_put(export);
750 EXPORT_SYMBOL(class_conn2obd);
752 struct obd_import *class_exp2cliimp(struct obd_export *exp)
754 struct obd_device *obd = exp->exp_obd;
757 return obd->u.cli.cl_import;
759 EXPORT_SYMBOL(class_exp2cliimp);
761 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
763 struct obd_device *obd = class_conn2obd(conn);
766 return obd->u.cli.cl_import;
768 EXPORT_SYMBOL(class_conn2cliimp);
770 /* Export management functions */
771 static void class_export_destroy(struct obd_export *exp)
773 struct obd_device *obd = exp->exp_obd;
776 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
777 LASSERT(obd != NULL);
779 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
780 exp->exp_client_uuid.uuid, obd->obd_name);
782 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
783 if (exp->exp_connection)
784 ptlrpc_put_connection_superhack(exp->exp_connection);
786 LASSERT(list_empty(&exp->exp_outstanding_replies));
787 LASSERT(list_empty(&exp->exp_uncommitted_replies));
788 LASSERT(list_empty(&exp->exp_req_replay_queue));
789 LASSERT(list_empty(&exp->exp_hp_rpcs));
790 obd_destroy_export(exp);
791 class_decref(obd, "export", exp);
793 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
797 static void export_handle_addref(void *export)
799 class_export_get(export);
802 static struct portals_handle_ops export_handle_ops = {
803 .hop_addref = export_handle_addref,
807 struct obd_export *class_export_get(struct obd_export *exp)
809 atomic_inc(&exp->exp_refcount);
810 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
811 atomic_read(&exp->exp_refcount));
814 EXPORT_SYMBOL(class_export_get);
816 void class_export_put(struct obd_export *exp)
818 LASSERT(exp != NULL);
819 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
820 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
821 atomic_read(&exp->exp_refcount) - 1);
823 if (atomic_dec_and_test(&exp->exp_refcount)) {
824 LASSERT(!list_empty(&exp->exp_obd_chain));
825 CDEBUG(D_IOCTL, "final put %p/%s\n",
826 exp, exp->exp_client_uuid.uuid);
828 /* release nid stat refererence */
829 lprocfs_exp_cleanup(exp);
831 obd_zombie_export_add(exp);
834 EXPORT_SYMBOL(class_export_put);
836 /* Creates a new export, adds it to the hash table, and returns a
837 * pointer to it. The refcount is 2: one for the hash reference, and
838 * one for the pointer returned by this function. */
839 struct obd_export *class_new_export(struct obd_device *obd,
840 struct obd_uuid *cluuid)
842 struct obd_export *export;
843 cfs_hash_t *hash = NULL;
847 OBD_ALLOC_PTR(export);
849 return ERR_PTR(-ENOMEM);
851 export->exp_conn_cnt = 0;
852 export->exp_lock_hash = NULL;
853 export->exp_flock_hash = NULL;
854 atomic_set(&export->exp_refcount, 2);
855 atomic_set(&export->exp_rpc_count, 0);
856 atomic_set(&export->exp_cb_count, 0);
857 atomic_set(&export->exp_locks_count, 0);
858 #if LUSTRE_TRACKS_LOCK_EXP_REFS
859 INIT_LIST_HEAD(&export->exp_locks_list);
860 spin_lock_init(&export->exp_locks_list_guard);
862 atomic_set(&export->exp_replay_count, 0);
863 export->exp_obd = obd;
864 INIT_LIST_HEAD(&export->exp_outstanding_replies);
865 spin_lock_init(&export->exp_uncommitted_replies_lock);
866 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
867 INIT_LIST_HEAD(&export->exp_req_replay_queue);
868 INIT_LIST_HEAD(&export->exp_handle.h_link);
869 INIT_LIST_HEAD(&export->exp_hp_rpcs);
870 INIT_LIST_HEAD(&export->exp_reg_rpcs);
871 class_handle_hash(&export->exp_handle, &export_handle_ops);
872 export->exp_last_request_time = cfs_time_current_sec();
873 spin_lock_init(&export->exp_lock);
874 spin_lock_init(&export->exp_rpc_lock);
875 INIT_HLIST_NODE(&export->exp_uuid_hash);
876 INIT_HLIST_NODE(&export->exp_nid_hash);
877 spin_lock_init(&export->exp_bl_list_lock);
878 INIT_LIST_HEAD(&export->exp_bl_list);
880 export->exp_sp_peer = LUSTRE_SP_ANY;
881 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
882 export->exp_client_uuid = *cluuid;
883 obd_init_export(export);
885 spin_lock(&obd->obd_dev_lock);
886 /* shouldn't happen, but might race */
887 if (obd->obd_stopping)
888 GOTO(exit_unlock, rc = -ENODEV);
890 hash = cfs_hash_getref(obd->obd_uuid_hash);
892 GOTO(exit_unlock, rc = -ENODEV);
893 spin_unlock(&obd->obd_dev_lock);
895 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
896 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
898 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
899 obd->obd_name, cluuid->uuid, rc);
900 GOTO(exit_err, rc = -EALREADY);
904 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
905 spin_lock(&obd->obd_dev_lock);
906 if (obd->obd_stopping) {
907 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
908 GOTO(exit_unlock, rc = -ENODEV);
911 class_incref(obd, "export", export);
912 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
913 list_add_tail(&export->exp_obd_chain_timed,
914 &export->exp_obd->obd_exports_timed);
915 export->exp_obd->obd_num_exports++;
916 spin_unlock(&obd->obd_dev_lock);
917 cfs_hash_putref(hash);
921 spin_unlock(&obd->obd_dev_lock);
924 cfs_hash_putref(hash);
925 class_handle_unhash(&export->exp_handle);
926 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
927 obd_destroy_export(export);
928 OBD_FREE_PTR(export);
931 EXPORT_SYMBOL(class_new_export);
933 void class_unlink_export(struct obd_export *exp)
935 class_handle_unhash(&exp->exp_handle);
937 spin_lock(&exp->exp_obd->obd_dev_lock);
938 /* delete an uuid-export hashitem from hashtables */
939 if (!hlist_unhashed(&exp->exp_uuid_hash))
940 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
941 &exp->exp_client_uuid,
942 &exp->exp_uuid_hash);
944 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
945 list_del_init(&exp->exp_obd_chain_timed);
946 exp->exp_obd->obd_num_exports--;
947 spin_unlock(&exp->exp_obd->obd_dev_lock);
948 class_export_put(exp);
950 EXPORT_SYMBOL(class_unlink_export);
952 /* Import management functions */
953 void class_import_destroy(struct obd_import *imp)
957 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
958 imp->imp_obd->obd_name);
960 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
962 ptlrpc_put_connection_superhack(imp->imp_connection);
964 while (!list_empty(&imp->imp_conn_list)) {
965 struct obd_import_conn *imp_conn;
967 imp_conn = list_entry(imp->imp_conn_list.next,
968 struct obd_import_conn, oic_item);
969 list_del_init(&imp_conn->oic_item);
970 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
971 OBD_FREE(imp_conn, sizeof(*imp_conn));
974 LASSERT(imp->imp_sec == NULL);
975 class_decref(imp->imp_obd, "import", imp);
976 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
980 static void import_handle_addref(void *import)
982 class_import_get(import);
985 static struct portals_handle_ops import_handle_ops = {
986 .hop_addref = import_handle_addref,
990 struct obd_import *class_import_get(struct obd_import *import)
992 atomic_inc(&import->imp_refcount);
993 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
994 atomic_read(&import->imp_refcount),
995 import->imp_obd->obd_name);
998 EXPORT_SYMBOL(class_import_get);
1000 void class_import_put(struct obd_import *imp)
1004 LASSERT(list_empty(&imp->imp_zombie_chain));
1005 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1007 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1008 atomic_read(&imp->imp_refcount) - 1,
1009 imp->imp_obd->obd_name);
1011 if (atomic_dec_and_test(&imp->imp_refcount)) {
1012 CDEBUG(D_INFO, "final put import %p\n", imp);
1013 obd_zombie_import_add(imp);
1016 /* catch possible import put race */
1017 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1020 EXPORT_SYMBOL(class_import_put);
1022 static void init_imp_at(struct imp_at *at) {
1024 at_init(&at->iat_net_latency, 0, 0);
1025 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1026 /* max service estimates are tracked on the server side, so
1027 don't use the AT history here, just use the last reported
1028 val. (But keep hist for proc histogram, worst_ever) */
1029 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1034 struct obd_import *class_new_import(struct obd_device *obd)
1036 struct obd_import *imp;
1038 OBD_ALLOC(imp, sizeof(*imp));
1042 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1043 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1044 INIT_LIST_HEAD(&imp->imp_replay_list);
1045 INIT_LIST_HEAD(&imp->imp_sending_list);
1046 INIT_LIST_HEAD(&imp->imp_delayed_list);
1047 INIT_LIST_HEAD(&imp->imp_committed_list);
1048 imp->imp_replay_cursor = &imp->imp_committed_list;
1049 spin_lock_init(&imp->imp_lock);
1050 imp->imp_last_success_conn = 0;
1051 imp->imp_state = LUSTRE_IMP_NEW;
1052 imp->imp_obd = class_incref(obd, "import", imp);
1053 mutex_init(&imp->imp_sec_mutex);
1054 init_waitqueue_head(&imp->imp_recovery_waitq);
1056 atomic_set(&imp->imp_refcount, 2);
1057 atomic_set(&imp->imp_unregistering, 0);
1058 atomic_set(&imp->imp_inflight, 0);
1059 atomic_set(&imp->imp_replay_inflight, 0);
1060 atomic_set(&imp->imp_inval_count, 0);
1061 INIT_LIST_HEAD(&imp->imp_conn_list);
1062 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1063 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1064 init_imp_at(&imp->imp_at);
1066 /* the default magic is V2, will be used in connect RPC, and
1067 * then adjusted according to the flags in request/reply. */
1068 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1072 EXPORT_SYMBOL(class_new_import);
1074 void class_destroy_import(struct obd_import *import)
1076 LASSERT(import != NULL);
1077 LASSERT(import != LP_POISON);
1079 class_handle_unhash(&import->imp_handle);
1081 spin_lock(&import->imp_lock);
1082 import->imp_generation++;
1083 spin_unlock(&import->imp_lock);
1084 class_import_put(import);
1086 EXPORT_SYMBOL(class_destroy_import);
1088 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1090 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1092 spin_lock(&exp->exp_locks_list_guard);
1094 LASSERT(lock->l_exp_refs_nr >= 0);
1096 if (lock->l_exp_refs_target != NULL &&
1097 lock->l_exp_refs_target != exp) {
1098 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1099 exp, lock, lock->l_exp_refs_target);
1101 if ((lock->l_exp_refs_nr ++) == 0) {
1102 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1103 lock->l_exp_refs_target = exp;
1105 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1106 lock, exp, lock->l_exp_refs_nr);
1107 spin_unlock(&exp->exp_locks_list_guard);
1109 EXPORT_SYMBOL(__class_export_add_lock_ref);
1111 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1113 spin_lock(&exp->exp_locks_list_guard);
1114 LASSERT(lock->l_exp_refs_nr > 0);
1115 if (lock->l_exp_refs_target != exp) {
1116 LCONSOLE_WARN("lock %p, "
1117 "mismatching export pointers: %p, %p\n",
1118 lock, lock->l_exp_refs_target, exp);
1120 if (-- lock->l_exp_refs_nr == 0) {
1121 list_del_init(&lock->l_exp_refs_link);
1122 lock->l_exp_refs_target = NULL;
1124 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1125 lock, exp, lock->l_exp_refs_nr);
1126 spin_unlock(&exp->exp_locks_list_guard);
1128 EXPORT_SYMBOL(__class_export_del_lock_ref);
1131 /* A connection defines an export context in which preallocation can
1132 be managed. This releases the export pointer reference, and returns
1133 the export handle, so the export refcount is 1 when this function
1135 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1136 struct obd_uuid *cluuid)
1138 struct obd_export *export;
1139 LASSERT(conn != NULL);
1140 LASSERT(obd != NULL);
1141 LASSERT(cluuid != NULL);
1144 export = class_new_export(obd, cluuid);
1146 RETURN(PTR_ERR(export));
1148 conn->cookie = export->exp_handle.h_cookie;
1149 class_export_put(export);
1151 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1152 cluuid->uuid, conn->cookie);
1155 EXPORT_SYMBOL(class_connect);
1157 /* if export is involved in recovery then clean up related things */
1158 void class_export_recovery_cleanup(struct obd_export *exp)
1160 struct obd_device *obd = exp->exp_obd;
1162 spin_lock(&obd->obd_recovery_task_lock);
1163 if (obd->obd_recovering) {
1164 if (exp->exp_in_recovery) {
1165 spin_lock(&exp->exp_lock);
1166 exp->exp_in_recovery = 0;
1167 spin_unlock(&exp->exp_lock);
1168 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1169 atomic_dec(&obd->obd_connected_clients);
1172 /* if called during recovery then should update
1173 * obd_stale_clients counter,
1174 * lightweight exports are not counted */
1175 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1176 exp->exp_obd->obd_stale_clients++;
1178 spin_unlock(&obd->obd_recovery_task_lock);
1180 spin_lock(&exp->exp_lock);
1181 /** Cleanup req replay fields */
1182 if (exp->exp_req_replay_needed) {
1183 exp->exp_req_replay_needed = 0;
1185 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1186 atomic_dec(&obd->obd_req_replay_clients);
1189 /** Cleanup lock replay data */
1190 if (exp->exp_lock_replay_needed) {
1191 exp->exp_lock_replay_needed = 0;
1193 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1194 atomic_dec(&obd->obd_lock_replay_clients);
1196 spin_unlock(&exp->exp_lock);
1199 /* This function removes 1-3 references from the export:
1200 * 1 - for export pointer passed
1201 * and if disconnect really need
1202 * 2 - removing from hash
1203 * 3 - in client_unlink_export
1204 * The export pointer passed to this function can destroyed */
1205 int class_disconnect(struct obd_export *export)
1207 int already_disconnected;
1210 if (export == NULL) {
1211 CWARN("attempting to free NULL export %p\n", export);
1215 spin_lock(&export->exp_lock);
1216 already_disconnected = export->exp_disconnected;
1217 export->exp_disconnected = 1;
1218 spin_unlock(&export->exp_lock);
1220 /* class_cleanup(), abort_recovery(), and class_fail_export()
1221 * all end up in here, and if any of them race we shouldn't
1222 * call extra class_export_puts(). */
1223 if (already_disconnected) {
1224 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1225 GOTO(no_disconn, already_disconnected);
1228 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1229 export->exp_handle.h_cookie);
1231 if (!hlist_unhashed(&export->exp_nid_hash))
1232 cfs_hash_del(export->exp_obd->obd_nid_hash,
1233 &export->exp_connection->c_peer.nid,
1234 &export->exp_nid_hash);
1236 class_export_recovery_cleanup(export);
1237 class_unlink_export(export);
1239 class_export_put(export);
1242 EXPORT_SYMBOL(class_disconnect);
1244 /* Return non-zero for a fully connected export */
1245 int class_connected_export(struct obd_export *exp)
1250 spin_lock(&exp->exp_lock);
1251 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1252 spin_unlock(&exp->exp_lock);
1256 EXPORT_SYMBOL(class_connected_export);
1258 static void class_disconnect_export_list(struct list_head *list,
1259 enum obd_option flags)
1262 struct obd_export *exp;
1265 /* It's possible that an export may disconnect itself, but
1266 * nothing else will be added to this list. */
1267 while (!list_empty(list)) {
1268 exp = list_entry(list->next, struct obd_export,
1270 /* need for safe call CDEBUG after obd_disconnect */
1271 class_export_get(exp);
1273 spin_lock(&exp->exp_lock);
1274 exp->exp_flags = flags;
1275 spin_unlock(&exp->exp_lock);
1277 if (obd_uuid_equals(&exp->exp_client_uuid,
1278 &exp->exp_obd->obd_uuid)) {
1280 "exp %p export uuid == obd uuid, don't discon\n",
1282 /* Need to delete this now so we don't end up pointing
1283 * to work_list later when this export is cleaned up. */
1284 list_del_init(&exp->exp_obd_chain);
1285 class_export_put(exp);
1289 class_export_get(exp);
1290 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1291 "last request at "CFS_TIME_T"\n",
1292 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1293 exp, exp->exp_last_request_time);
1294 /* release one export reference anyway */
1295 rc = obd_disconnect(exp);
1297 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1298 obd_export_nid2str(exp), exp, rc);
1299 class_export_put(exp);
1304 void class_disconnect_exports(struct obd_device *obd)
1306 struct list_head work_list;
1309 /* Move all of the exports from obd_exports to a work list, en masse. */
1310 INIT_LIST_HEAD(&work_list);
1311 spin_lock(&obd->obd_dev_lock);
1312 list_splice_init(&obd->obd_exports, &work_list);
1313 list_splice_init(&obd->obd_delayed_exports, &work_list);
1314 spin_unlock(&obd->obd_dev_lock);
1316 if (!list_empty(&work_list)) {
1317 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1318 "disconnecting them\n", obd->obd_minor, obd);
1319 class_disconnect_export_list(&work_list,
1320 exp_flags_from_obd(obd));
1322 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1323 obd->obd_minor, obd);
1326 EXPORT_SYMBOL(class_disconnect_exports);
1328 /* Remove exports that have not completed recovery.
1330 void class_disconnect_stale_exports(struct obd_device *obd,
1331 int (*test_export)(struct obd_export *))
1333 struct list_head work_list;
1334 struct obd_export *exp, *n;
1338 INIT_LIST_HEAD(&work_list);
1339 spin_lock(&obd->obd_dev_lock);
1340 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1342 /* don't count self-export as client */
1343 if (obd_uuid_equals(&exp->exp_client_uuid,
1344 &exp->exp_obd->obd_uuid))
1347 /* don't evict clients which have no slot in last_rcvd
1348 * (e.g. lightweight connection) */
1349 if (exp->exp_target_data.ted_lr_idx == -1)
1352 spin_lock(&exp->exp_lock);
1353 if (exp->exp_failed || test_export(exp)) {
1354 spin_unlock(&exp->exp_lock);
1357 exp->exp_failed = 1;
1358 spin_unlock(&exp->exp_lock);
1360 list_move(&exp->exp_obd_chain, &work_list);
1362 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1363 obd->obd_name, exp->exp_client_uuid.uuid,
1364 exp->exp_connection == NULL ? "<unknown>" :
1365 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1366 print_export_data(exp, "EVICTING", 0);
1368 spin_unlock(&obd->obd_dev_lock);
1371 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1372 obd->obd_name, evicted);
1374 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1375 OBD_OPT_ABORT_RECOV);
1378 EXPORT_SYMBOL(class_disconnect_stale_exports);
1380 void class_fail_export(struct obd_export *exp)
1382 int rc, already_failed;
1384 spin_lock(&exp->exp_lock);
1385 already_failed = exp->exp_failed;
1386 exp->exp_failed = 1;
1387 spin_unlock(&exp->exp_lock);
1389 if (already_failed) {
1390 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1391 exp, exp->exp_client_uuid.uuid);
1395 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1396 exp, exp->exp_client_uuid.uuid);
1398 if (obd_dump_on_timeout)
1399 libcfs_debug_dumplog();
1401 /* need for safe call CDEBUG after obd_disconnect */
1402 class_export_get(exp);
1404 /* Most callers into obd_disconnect are removing their own reference
1405 * (request, for example) in addition to the one from the hash table.
1406 * We don't have such a reference here, so make one. */
1407 class_export_get(exp);
1408 rc = obd_disconnect(exp);
1410 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1412 CDEBUG(D_HA, "disconnected export %p/%s\n",
1413 exp, exp->exp_client_uuid.uuid);
1414 class_export_put(exp);
1416 EXPORT_SYMBOL(class_fail_export);
1418 char *obd_export_nid2str(struct obd_export *exp)
1420 if (exp->exp_connection != NULL)
1421 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1425 EXPORT_SYMBOL(obd_export_nid2str);
1427 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1429 cfs_hash_t *nid_hash;
1430 struct obd_export *doomed_exp = NULL;
1431 int exports_evicted = 0;
1433 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1435 spin_lock(&obd->obd_dev_lock);
1436 /* umount has run already, so evict thread should leave
1437 * its task to umount thread now */
1438 if (obd->obd_stopping) {
1439 spin_unlock(&obd->obd_dev_lock);
1440 return exports_evicted;
1442 nid_hash = obd->obd_nid_hash;
1443 cfs_hash_getref(nid_hash);
1444 spin_unlock(&obd->obd_dev_lock);
1447 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1448 if (doomed_exp == NULL)
1451 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1452 "nid %s found, wanted nid %s, requested nid %s\n",
1453 obd_export_nid2str(doomed_exp),
1454 libcfs_nid2str(nid_key), nid);
1455 LASSERTF(doomed_exp != obd->obd_self_export,
1456 "self-export is hashed by NID?\n");
1458 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1459 "request\n", obd->obd_name,
1460 obd_uuid2str(&doomed_exp->exp_client_uuid),
1461 obd_export_nid2str(doomed_exp));
1462 class_fail_export(doomed_exp);
1463 class_export_put(doomed_exp);
1466 cfs_hash_putref(nid_hash);
1468 if (!exports_evicted)
1469 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1470 obd->obd_name, nid);
1471 return exports_evicted;
1473 EXPORT_SYMBOL(obd_export_evict_by_nid);
1475 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1477 cfs_hash_t *uuid_hash;
1478 struct obd_export *doomed_exp = NULL;
1479 struct obd_uuid doomed_uuid;
1480 int exports_evicted = 0;
1482 spin_lock(&obd->obd_dev_lock);
1483 if (obd->obd_stopping) {
1484 spin_unlock(&obd->obd_dev_lock);
1485 return exports_evicted;
1487 uuid_hash = obd->obd_uuid_hash;
1488 cfs_hash_getref(uuid_hash);
1489 spin_unlock(&obd->obd_dev_lock);
1491 obd_str2uuid(&doomed_uuid, uuid);
1492 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1493 CERROR("%s: can't evict myself\n", obd->obd_name);
1494 cfs_hash_putref(uuid_hash);
1495 return exports_evicted;
1498 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1500 if (doomed_exp == NULL) {
1501 CERROR("%s: can't disconnect %s: no exports found\n",
1502 obd->obd_name, uuid);
1504 CWARN("%s: evicting %s at adminstrative request\n",
1505 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1506 class_fail_export(doomed_exp);
1507 class_export_put(doomed_exp);
1510 cfs_hash_putref(uuid_hash);
1512 return exports_evicted;
1514 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1516 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1517 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1518 EXPORT_SYMBOL(class_export_dump_hook);
1521 static void print_export_data(struct obd_export *exp, const char *status,
1524 struct ptlrpc_reply_state *rs;
1525 struct ptlrpc_reply_state *first_reply = NULL;
1528 spin_lock(&exp->exp_lock);
1529 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1535 spin_unlock(&exp->exp_lock);
1537 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1538 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1539 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1540 atomic_read(&exp->exp_rpc_count),
1541 atomic_read(&exp->exp_cb_count),
1542 atomic_read(&exp->exp_locks_count),
1543 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1544 nreplies, first_reply, nreplies > 3 ? "..." : "",
1545 exp->exp_last_committed);
1546 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1547 if (locks && class_export_dump_hook != NULL)
1548 class_export_dump_hook(exp);
1552 void dump_exports(struct obd_device *obd, int locks)
1554 struct obd_export *exp;
1556 spin_lock(&obd->obd_dev_lock);
1557 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1558 print_export_data(exp, "ACTIVE", locks);
1559 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1560 print_export_data(exp, "UNLINKED", locks);
1561 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1562 print_export_data(exp, "DELAYED", locks);
1563 spin_unlock(&obd->obd_dev_lock);
1564 spin_lock(&obd_zombie_impexp_lock);
1565 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1566 print_export_data(exp, "ZOMBIE", locks);
1567 spin_unlock(&obd_zombie_impexp_lock);
1569 EXPORT_SYMBOL(dump_exports);
1571 void obd_exports_barrier(struct obd_device *obd)
1574 LASSERT(list_empty(&obd->obd_exports));
1575 spin_lock(&obd->obd_dev_lock);
1576 while (!list_empty(&obd->obd_unlinked_exports)) {
1577 spin_unlock(&obd->obd_dev_lock);
1578 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1579 cfs_time_seconds(waited));
1580 if (waited > 5 && IS_PO2(waited)) {
1581 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1582 "more than %d seconds. "
1583 "The obd refcount = %d. Is it stuck?\n",
1584 obd->obd_name, waited,
1585 atomic_read(&obd->obd_refcount));
1586 dump_exports(obd, 1);
1589 spin_lock(&obd->obd_dev_lock);
1591 spin_unlock(&obd->obd_dev_lock);
1593 EXPORT_SYMBOL(obd_exports_barrier);
1595 /* Total amount of zombies to be destroyed */
1596 static int zombies_count = 0;
1599 * kill zombie imports and exports
1601 void obd_zombie_impexp_cull(void)
1603 struct obd_import *import;
1604 struct obd_export *export;
1608 spin_lock(&obd_zombie_impexp_lock);
1611 if (!list_empty(&obd_zombie_imports)) {
1612 import = list_entry(obd_zombie_imports.next,
1615 list_del_init(&import->imp_zombie_chain);
1619 if (!list_empty(&obd_zombie_exports)) {
1620 export = list_entry(obd_zombie_exports.next,
1623 list_del_init(&export->exp_obd_chain);
1626 spin_unlock(&obd_zombie_impexp_lock);
1628 if (import != NULL) {
1629 class_import_destroy(import);
1630 spin_lock(&obd_zombie_impexp_lock);
1632 spin_unlock(&obd_zombie_impexp_lock);
1635 if (export != NULL) {
1636 class_export_destroy(export);
1637 spin_lock(&obd_zombie_impexp_lock);
1639 spin_unlock(&obd_zombie_impexp_lock);
1643 } while (import != NULL || export != NULL);
1647 static struct completion obd_zombie_start;
1648 static struct completion obd_zombie_stop;
1649 static unsigned long obd_zombie_flags;
1650 static wait_queue_head_t obd_zombie_waitq;
1651 static pid_t obd_zombie_pid;
1654 OBD_ZOMBIE_STOP = 0x0001,
1658 * check for work for kill zombie import/export thread.
1660 static int obd_zombie_impexp_check(void *arg)
1664 spin_lock(&obd_zombie_impexp_lock);
1665 rc = (zombies_count == 0) &&
1666 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1667 spin_unlock(&obd_zombie_impexp_lock);
1673 * Add export to the obd_zombe thread and notify it.
1675 static void obd_zombie_export_add(struct obd_export *exp) {
1676 spin_lock(&exp->exp_obd->obd_dev_lock);
1677 LASSERT(!list_empty(&exp->exp_obd_chain));
1678 list_del_init(&exp->exp_obd_chain);
1679 spin_unlock(&exp->exp_obd->obd_dev_lock);
1680 spin_lock(&obd_zombie_impexp_lock);
1682 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1683 spin_unlock(&obd_zombie_impexp_lock);
1685 obd_zombie_impexp_notify();
1689 * Add import to the obd_zombe thread and notify it.
1691 static void obd_zombie_import_add(struct obd_import *imp) {
1692 LASSERT(imp->imp_sec == NULL);
1693 LASSERT(imp->imp_rq_pool == NULL);
1694 spin_lock(&obd_zombie_impexp_lock);
1695 LASSERT(list_empty(&imp->imp_zombie_chain));
1697 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1698 spin_unlock(&obd_zombie_impexp_lock);
1700 obd_zombie_impexp_notify();
1704 * notify import/export destroy thread about new zombie.
1706 static void obd_zombie_impexp_notify(void)
1709 * Make sure obd_zomebie_impexp_thread get this notification.
1710 * It is possible this signal only get by obd_zombie_barrier, and
1711 * barrier gulps this notification and sleeps away and hangs ensues
1713 wake_up_all(&obd_zombie_waitq);
1717 * check whether obd_zombie is idle
1719 static int obd_zombie_is_idle(void)
1723 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1724 spin_lock(&obd_zombie_impexp_lock);
1725 rc = (zombies_count == 0);
1726 spin_unlock(&obd_zombie_impexp_lock);
1731 * wait when obd_zombie import/export queues become empty
1733 void obd_zombie_barrier(void)
1735 struct l_wait_info lwi = { 0 };
1737 if (obd_zombie_pid == current_pid())
1738 /* don't wait for myself */
1740 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1742 EXPORT_SYMBOL(obd_zombie_barrier);
1746 * destroy zombie export/import thread.
1748 static int obd_zombie_impexp_thread(void *unused)
1750 unshare_fs_struct();
1751 complete(&obd_zombie_start);
1753 obd_zombie_pid = current_pid();
1755 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1756 struct l_wait_info lwi = { 0 };
1758 l_wait_event(obd_zombie_waitq,
1759 !obd_zombie_impexp_check(NULL), &lwi);
1760 obd_zombie_impexp_cull();
1763 * Notify obd_zombie_barrier callers that queues
1766 wake_up(&obd_zombie_waitq);
1769 complete(&obd_zombie_stop);
1776 * start destroy zombie import/export thread
1778 int obd_zombie_impexp_init(void)
1780 struct task_struct *task;
1782 INIT_LIST_HEAD(&obd_zombie_imports);
1784 INIT_LIST_HEAD(&obd_zombie_exports);
1785 spin_lock_init(&obd_zombie_impexp_lock);
1786 init_completion(&obd_zombie_start);
1787 init_completion(&obd_zombie_stop);
1788 init_waitqueue_head(&obd_zombie_waitq);
1791 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1793 RETURN(PTR_ERR(task));
1795 wait_for_completion(&obd_zombie_start);
1799 * stop destroy zombie import/export thread
1801 void obd_zombie_impexp_stop(void)
1803 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1804 obd_zombie_impexp_notify();
1805 wait_for_completion(&obd_zombie_stop);
1808 /***** Kernel-userspace comm helpers *******/
1810 /* Get length of entire message, including header */
1811 int kuc_len(int payload_len)
1813 return sizeof(struct kuc_hdr) + payload_len;
1815 EXPORT_SYMBOL(kuc_len);
1817 /* Get a pointer to kuc header, given a ptr to the payload
1818 * @param p Pointer to payload area
1819 * @returns Pointer to kuc header
1821 struct kuc_hdr * kuc_ptr(void *p)
1823 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1824 LASSERT(lh->kuc_magic == KUC_MAGIC);
1827 EXPORT_SYMBOL(kuc_ptr);
1829 /* Test if payload is part of kuc message
1830 * @param p Pointer to payload area
1833 int kuc_ispayload(void *p)
1835 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1837 if (kh->kuc_magic == KUC_MAGIC)
1842 EXPORT_SYMBOL(kuc_ispayload);
1844 /* Alloc space for a message, and fill in header
1845 * @return Pointer to payload area
1847 void *kuc_alloc(int payload_len, int transport, int type)
1850 int len = kuc_len(payload_len);
1854 return ERR_PTR(-ENOMEM);
1856 lh->kuc_magic = KUC_MAGIC;
1857 lh->kuc_transport = transport;
1858 lh->kuc_msgtype = type;
1859 lh->kuc_msglen = len;
1861 return (void *)(lh + 1);
1863 EXPORT_SYMBOL(kuc_alloc);
1865 /* Takes pointer to payload area */
1866 inline void kuc_free(void *p, int payload_len)
1868 struct kuc_hdr *lh = kuc_ptr(p);
1869 OBD_FREE(lh, kuc_len(payload_len));
1871 EXPORT_SYMBOL(kuc_free);
1873 struct obd_request_slot_waiter {
1874 struct list_head orsw_entry;
1875 wait_queue_head_t orsw_waitq;
1879 static bool obd_request_slot_avail(struct client_obd *cli,
1880 struct obd_request_slot_waiter *orsw)
1884 spin_lock(&cli->cl_loi_list_lock);
1885 avail = !!list_empty(&orsw->orsw_entry);
1886 spin_unlock(&cli->cl_loi_list_lock);
1892 * For network flow control, the RPC sponsor needs to acquire a credit
1893 * before sending the RPC. The credits count for a connection is defined
1894 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1895 * the subsequent RPC sponsors need to wait until others released their
1896 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1898 int obd_get_request_slot(struct client_obd *cli)
1900 struct obd_request_slot_waiter orsw;
1901 struct l_wait_info lwi;
1904 spin_lock(&cli->cl_loi_list_lock);
1905 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1906 cli->cl_r_in_flight++;
1907 spin_unlock(&cli->cl_loi_list_lock);
1911 init_waitqueue_head(&orsw.orsw_waitq);
1912 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1913 orsw.orsw_signaled = false;
1914 spin_unlock(&cli->cl_loi_list_lock);
1916 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1917 rc = l_wait_event(orsw.orsw_waitq,
1918 obd_request_slot_avail(cli, &orsw) ||
1922 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1923 * freed but other (such as obd_put_request_slot) is using it. */
1924 spin_lock(&cli->cl_loi_list_lock);
1926 if (!orsw.orsw_signaled) {
1927 if (list_empty(&orsw.orsw_entry))
1928 cli->cl_r_in_flight--;
1930 list_del(&orsw.orsw_entry);
1934 if (orsw.orsw_signaled) {
1935 LASSERT(list_empty(&orsw.orsw_entry));
1939 spin_unlock(&cli->cl_loi_list_lock);
1943 EXPORT_SYMBOL(obd_get_request_slot);
1945 void obd_put_request_slot(struct client_obd *cli)
1947 struct obd_request_slot_waiter *orsw;
1949 spin_lock(&cli->cl_loi_list_lock);
1950 cli->cl_r_in_flight--;
1952 /* If there is free slot, wakeup the first waiter. */
1953 if (!list_empty(&cli->cl_loi_read_list) &&
1954 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1955 orsw = list_entry(cli->cl_loi_read_list.next,
1956 struct obd_request_slot_waiter, orsw_entry);
1957 list_del_init(&orsw->orsw_entry);
1958 cli->cl_r_in_flight++;
1959 wake_up(&orsw->orsw_waitq);
1961 spin_unlock(&cli->cl_loi_list_lock);
1963 EXPORT_SYMBOL(obd_put_request_slot);
1965 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1967 return cli->cl_max_rpcs_in_flight;
1969 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1971 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1973 struct obd_request_slot_waiter *orsw;
1978 if (max > OBD_MAX_RIF_MAX || max < 1)
1981 spin_lock(&cli->cl_loi_list_lock);
1982 old = cli->cl_max_rpcs_in_flight;
1983 cli->cl_max_rpcs_in_flight = max;
1986 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1987 for (i = 0; i < diff; i++) {
1988 if (list_empty(&cli->cl_loi_read_list))
1991 orsw = list_entry(cli->cl_loi_read_list.next,
1992 struct obd_request_slot_waiter, orsw_entry);
1993 list_del_init(&orsw->orsw_entry);
1994 cli->cl_r_in_flight++;
1995 wake_up(&orsw->orsw_waitq);
1997 spin_unlock(&cli->cl_loi_list_lock);
2001 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);