4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_kernelcomm.h>
49 spinlock_t obd_types_lock;
51 static struct kmem_cache *obd_device_cachep;
52 struct kmem_cache *obdo_cachep;
53 EXPORT_SYMBOL(obdo_cachep);
54 static struct kmem_cache *import_cachep;
56 static struct list_head obd_zombie_imports;
57 static struct list_head obd_zombie_exports;
58 static spinlock_t obd_zombie_impexp_lock;
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64 const char *status, int locks);
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
70 * support functions: we could use inter-module communication, but this
71 * is more portable to other OS's
73 static struct obd_device *obd_device_alloc(void)
75 struct obd_device *obd;
77 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79 obd->obd_magic = OBD_DEVICE_MAGIC;
84 static void obd_device_free(struct obd_device *obd)
87 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89 if (obd->obd_namespace != NULL) {
90 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91 obd, obd->obd_namespace, obd->obd_force);
94 lu_ref_fini(&obd->obd_reference);
95 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 struct obd_type *class_search_type(const char *name)
100 struct list_head *tmp;
101 struct obd_type *type;
103 spin_lock(&obd_types_lock);
104 list_for_each(tmp, &obd_types) {
105 type = list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 spin_unlock(&obd_types_lock);
111 spin_unlock(&obd_types_lock);
114 EXPORT_SYMBOL(class_search_type);
116 struct obd_type *class_get_type(const char *name)
118 struct obd_type *type = class_search_type(name);
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
122 const char *modname = name;
124 if (strcmp(modname, "obdfilter") == 0)
127 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
128 modname = LUSTRE_OSP_NAME;
130 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
131 modname = LUSTRE_MDT_NAME;
133 if (!request_module("%s", modname)) {
134 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
135 type = class_search_type(name);
137 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
143 spin_lock(&type->obd_type_lock);
145 try_module_get(type->typ_dt_ops->o_owner);
146 spin_unlock(&type->obd_type_lock);
151 void class_put_type(struct obd_type *type)
154 spin_lock(&type->obd_type_lock);
156 module_put(type->typ_dt_ops->o_owner);
157 spin_unlock(&type->obd_type_lock);
160 #define CLASS_MAX_NAME 1024
162 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
163 bool enable_proc, struct lprocfs_vars *vars,
164 const char *name, struct lu_device_type *ldt)
166 struct obd_type *type;
171 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
173 if (class_search_type(name)) {
174 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
179 OBD_ALLOC(type, sizeof(*type));
183 OBD_ALLOC_PTR(type->typ_dt_ops);
184 OBD_ALLOC_PTR(type->typ_md_ops);
185 OBD_ALLOC(type->typ_name, strlen(name) + 1);
187 if (type->typ_dt_ops == NULL ||
188 type->typ_md_ops == NULL ||
189 type->typ_name == NULL)
192 *(type->typ_dt_ops) = *dt_ops;
193 /* md_ops is optional */
195 *(type->typ_md_ops) = *md_ops;
196 strcpy(type->typ_name, name);
197 spin_lock_init(&type->obd_type_lock);
199 #ifdef CONFIG_PROC_FS
201 type->typ_procroot = lprocfs_register(type->typ_name,
204 if (IS_ERR(type->typ_procroot)) {
205 rc = PTR_ERR(type->typ_procroot);
206 type->typ_procroot = NULL;
213 rc = lu_device_type_init(ldt);
218 spin_lock(&obd_types_lock);
219 list_add(&type->typ_chain, &obd_types);
220 spin_unlock(&obd_types_lock);
225 if (type->typ_name != NULL) {
226 #ifdef CONFIG_PROC_FS
227 if (type->typ_procroot != NULL)
228 remove_proc_subtree(type->typ_name, proc_lustre_root);
230 OBD_FREE(type->typ_name, strlen(name) + 1);
232 if (type->typ_md_ops != NULL)
233 OBD_FREE_PTR(type->typ_md_ops);
234 if (type->typ_dt_ops != NULL)
235 OBD_FREE_PTR(type->typ_dt_ops);
236 OBD_FREE(type, sizeof(*type));
239 EXPORT_SYMBOL(class_register_type);
241 int class_unregister_type(const char *name)
243 struct obd_type *type = class_search_type(name);
247 CERROR("unknown obd type\n");
251 if (type->typ_refcnt) {
252 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
253 /* This is a bad situation, let's make the best of it */
254 /* Remove ops, but leave the name for debugging */
255 OBD_FREE_PTR(type->typ_dt_ops);
256 OBD_FREE_PTR(type->typ_md_ops);
260 /* we do not use type->typ_procroot as for compatibility purposes
261 * other modules can share names (i.e. lod can use lov entry). so
262 * we can't reference pointer as it can get invalided when another
263 * module removes the entry */
264 #ifdef CONFIG_PROC_FS
265 if (type->typ_procroot != NULL)
266 remove_proc_subtree(type->typ_name, proc_lustre_root);
267 if (type->typ_procsym != NULL)
268 lprocfs_remove(&type->typ_procsym);
271 lu_device_type_fini(type->typ_lu);
273 spin_lock(&obd_types_lock);
274 list_del(&type->typ_chain);
275 spin_unlock(&obd_types_lock);
276 OBD_FREE(type->typ_name, strlen(name) + 1);
277 if (type->typ_dt_ops != NULL)
278 OBD_FREE_PTR(type->typ_dt_ops);
279 if (type->typ_md_ops != NULL)
280 OBD_FREE_PTR(type->typ_md_ops);
281 OBD_FREE(type, sizeof(*type));
283 } /* class_unregister_type */
284 EXPORT_SYMBOL(class_unregister_type);
287 * Create a new obd device.
289 * Find an empty slot in ::obd_devs[], create a new obd device in it.
291 * \param[in] type_name obd device type string.
292 * \param[in] name obd device name.
294 * \retval NULL if create fails, otherwise return the obd device
297 struct obd_device *class_newdev(const char *type_name, const char *name)
299 struct obd_device *result = NULL;
300 struct obd_device *newdev;
301 struct obd_type *type = NULL;
303 int new_obd_minor = 0;
306 if (strlen(name) >= MAX_OBD_NAME) {
307 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
308 RETURN(ERR_PTR(-EINVAL));
311 type = class_get_type(type_name);
313 CERROR("OBD: unknown type: %s\n", type_name);
314 RETURN(ERR_PTR(-ENODEV));
317 newdev = obd_device_alloc();
319 GOTO(out_type, result = ERR_PTR(-ENOMEM));
321 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
323 write_lock(&obd_dev_lock);
324 for (i = 0; i < class_devno_max(); i++) {
325 struct obd_device *obd = class_num2obd(i);
327 if (obd && (strcmp(name, obd->obd_name) == 0)) {
328 CERROR("Device %s already exists at %d, won't add\n",
331 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
332 "%p obd_magic %08x != %08x\n", result,
333 result->obd_magic, OBD_DEVICE_MAGIC);
334 LASSERTF(result->obd_minor == new_obd_minor,
335 "%p obd_minor %d != %d\n", result,
336 result->obd_minor, new_obd_minor);
338 obd_devs[result->obd_minor] = NULL;
339 result->obd_name[0]='\0';
341 result = ERR_PTR(-EEXIST);
344 if (!result && !obd) {
346 result->obd_minor = i;
348 result->obd_type = type;
349 strncpy(result->obd_name, name,
350 sizeof(result->obd_name) - 1);
351 obd_devs[i] = result;
354 write_unlock(&obd_dev_lock);
356 if (result == NULL && i >= class_devno_max()) {
357 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
359 GOTO(out, result = ERR_PTR(-EOVERFLOW));
365 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
366 result->obd_name, result);
370 obd_device_free(newdev);
372 class_put_type(type);
376 void class_release_dev(struct obd_device *obd)
378 struct obd_type *obd_type = obd->obd_type;
380 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
381 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
382 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
383 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
384 LASSERT(obd_type != NULL);
386 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
387 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
389 write_lock(&obd_dev_lock);
390 obd_devs[obd->obd_minor] = NULL;
391 write_unlock(&obd_dev_lock);
392 obd_device_free(obd);
394 class_put_type(obd_type);
397 int class_name2dev(const char *name)
404 read_lock(&obd_dev_lock);
405 for (i = 0; i < class_devno_max(); i++) {
406 struct obd_device *obd = class_num2obd(i);
408 if (obd && strcmp(name, obd->obd_name) == 0) {
409 /* Make sure we finished attaching before we give
410 out any references */
411 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
412 if (obd->obd_attached) {
413 read_unlock(&obd_dev_lock);
419 read_unlock(&obd_dev_lock);
424 struct obd_device *class_name2obd(const char *name)
426 int dev = class_name2dev(name);
428 if (dev < 0 || dev > class_devno_max())
430 return class_num2obd(dev);
432 EXPORT_SYMBOL(class_name2obd);
434 int class_uuid2dev(struct obd_uuid *uuid)
438 read_lock(&obd_dev_lock);
439 for (i = 0; i < class_devno_max(); i++) {
440 struct obd_device *obd = class_num2obd(i);
442 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
443 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
444 read_unlock(&obd_dev_lock);
448 read_unlock(&obd_dev_lock);
453 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
455 int dev = class_uuid2dev(uuid);
458 return class_num2obd(dev);
460 EXPORT_SYMBOL(class_uuid2obd);
463 * Get obd device from ::obd_devs[]
465 * \param num [in] array index
467 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
468 * otherwise return the obd device there.
470 struct obd_device *class_num2obd(int num)
472 struct obd_device *obd = NULL;
474 if (num < class_devno_max()) {
479 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
480 "%p obd_magic %08x != %08x\n",
481 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
482 LASSERTF(obd->obd_minor == num,
483 "%p obd_minor %0d != %0d\n",
484 obd, obd->obd_minor, num);
491 * Get obd devices count. Device in any
493 * \retval obd device count
495 int get_devices_count(void)
497 int index, max_index = class_devno_max(), dev_count = 0;
499 read_lock(&obd_dev_lock);
500 for (index = 0; index <= max_index; index++) {
501 struct obd_device *obd = class_num2obd(index);
505 read_unlock(&obd_dev_lock);
509 EXPORT_SYMBOL(get_devices_count);
511 void class_obd_list(void)
516 read_lock(&obd_dev_lock);
517 for (i = 0; i < class_devno_max(); i++) {
518 struct obd_device *obd = class_num2obd(i);
522 if (obd->obd_stopping)
524 else if (obd->obd_set_up)
526 else if (obd->obd_attached)
530 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
531 i, status, obd->obd_type->typ_name,
532 obd->obd_name, obd->obd_uuid.uuid,
533 atomic_read(&obd->obd_refcount));
535 read_unlock(&obd_dev_lock);
539 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
540 specified, then only the client with that uuid is returned,
541 otherwise any client connected to the tgt is returned. */
542 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
543 const char * typ_name,
544 struct obd_uuid *grp_uuid)
548 read_lock(&obd_dev_lock);
549 for (i = 0; i < class_devno_max(); i++) {
550 struct obd_device *obd = class_num2obd(i);
554 if ((strncmp(obd->obd_type->typ_name, typ_name,
555 strlen(typ_name)) == 0)) {
556 if (obd_uuid_equals(tgt_uuid,
557 &obd->u.cli.cl_target_uuid) &&
558 ((grp_uuid)? obd_uuid_equals(grp_uuid,
559 &obd->obd_uuid) : 1)) {
560 read_unlock(&obd_dev_lock);
565 read_unlock(&obd_dev_lock);
569 EXPORT_SYMBOL(class_find_client_obd);
571 /* Iterate the obd_device list looking devices have grp_uuid. Start
572 searching at *next, and if a device is found, the next index to look
573 at is saved in *next. If next is NULL, then the first matching device
574 will always be returned. */
575 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
581 else if (*next >= 0 && *next < class_devno_max())
586 read_lock(&obd_dev_lock);
587 for (; i < class_devno_max(); i++) {
588 struct obd_device *obd = class_num2obd(i);
592 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
595 read_unlock(&obd_dev_lock);
599 read_unlock(&obd_dev_lock);
603 EXPORT_SYMBOL(class_devices_in_group);
606 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
607 * adjust sptlrpc settings accordingly.
609 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
611 struct obd_device *obd;
615 LASSERT(namelen > 0);
617 read_lock(&obd_dev_lock);
618 for (i = 0; i < class_devno_max(); i++) {
619 obd = class_num2obd(i);
621 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
624 /* only notify mdc, osc, mdt, ost */
625 type = obd->obd_type->typ_name;
626 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
627 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
628 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
629 strcmp(type, LUSTRE_OST_NAME) != 0)
632 if (strncmp(obd->obd_name, fsname, namelen))
635 class_incref(obd, __FUNCTION__, obd);
636 read_unlock(&obd_dev_lock);
637 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
638 sizeof(KEY_SPTLRPC_CONF),
639 KEY_SPTLRPC_CONF, 0, NULL, NULL);
641 class_decref(obd, __FUNCTION__, obd);
642 read_lock(&obd_dev_lock);
644 read_unlock(&obd_dev_lock);
647 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
649 void obd_cleanup_caches(void)
652 if (obd_device_cachep) {
653 kmem_cache_destroy(obd_device_cachep);
654 obd_device_cachep = NULL;
657 kmem_cache_destroy(obdo_cachep);
661 kmem_cache_destroy(import_cachep);
662 import_cachep = NULL;
668 int obd_init_caches(void)
673 LASSERT(obd_device_cachep == NULL);
674 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
675 sizeof(struct obd_device),
677 if (!obd_device_cachep)
678 GOTO(out, rc = -ENOMEM);
680 LASSERT(obdo_cachep == NULL);
681 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
684 GOTO(out, rc = -ENOMEM);
686 LASSERT(import_cachep == NULL);
687 import_cachep = kmem_cache_create("ll_import_cache",
688 sizeof(struct obd_import),
691 GOTO(out, rc = -ENOMEM);
695 obd_cleanup_caches();
699 /* map connection to client */
700 struct obd_export *class_conn2export(struct lustre_handle *conn)
702 struct obd_export *export;
706 CDEBUG(D_CACHE, "looking for null handle\n");
710 if (conn->cookie == -1) { /* this means assign a new connection */
711 CDEBUG(D_CACHE, "want a new connection\n");
715 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
716 export = class_handle2object(conn->cookie, NULL);
719 EXPORT_SYMBOL(class_conn2export);
721 struct obd_device *class_exp2obd(struct obd_export *exp)
727 EXPORT_SYMBOL(class_exp2obd);
729 struct obd_device *class_conn2obd(struct lustre_handle *conn)
731 struct obd_export *export;
732 export = class_conn2export(conn);
734 struct obd_device *obd = export->exp_obd;
735 class_export_put(export);
741 struct obd_import *class_exp2cliimp(struct obd_export *exp)
743 struct obd_device *obd = exp->exp_obd;
746 return obd->u.cli.cl_import;
748 EXPORT_SYMBOL(class_exp2cliimp);
750 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
752 struct obd_device *obd = class_conn2obd(conn);
755 return obd->u.cli.cl_import;
758 /* Export management functions */
759 static void class_export_destroy(struct obd_export *exp)
761 struct obd_device *obd = exp->exp_obd;
764 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
765 LASSERT(obd != NULL);
767 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
768 exp->exp_client_uuid.uuid, obd->obd_name);
770 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
771 if (exp->exp_connection)
772 ptlrpc_put_connection_superhack(exp->exp_connection);
774 LASSERT(list_empty(&exp->exp_outstanding_replies));
775 LASSERT(list_empty(&exp->exp_uncommitted_replies));
776 LASSERT(list_empty(&exp->exp_req_replay_queue));
777 LASSERT(list_empty(&exp->exp_hp_rpcs));
778 obd_destroy_export(exp);
779 class_decref(obd, "export", exp);
781 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
785 static void export_handle_addref(void *export)
787 class_export_get(export);
790 static struct portals_handle_ops export_handle_ops = {
791 .hop_addref = export_handle_addref,
795 struct obd_export *class_export_get(struct obd_export *exp)
797 atomic_inc(&exp->exp_refcount);
798 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
799 atomic_read(&exp->exp_refcount));
802 EXPORT_SYMBOL(class_export_get);
804 void class_export_put(struct obd_export *exp)
806 LASSERT(exp != NULL);
807 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
808 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
809 atomic_read(&exp->exp_refcount) - 1);
811 if (atomic_dec_and_test(&exp->exp_refcount)) {
812 LASSERT(!list_empty(&exp->exp_obd_chain));
813 CDEBUG(D_IOCTL, "final put %p/%s\n",
814 exp, exp->exp_client_uuid.uuid);
816 /* release nid stat refererence */
817 lprocfs_exp_cleanup(exp);
819 obd_zombie_export_add(exp);
822 EXPORT_SYMBOL(class_export_put);
824 /* Creates a new export, adds it to the hash table, and returns a
825 * pointer to it. The refcount is 2: one for the hash reference, and
826 * one for the pointer returned by this function. */
827 struct obd_export *class_new_export(struct obd_device *obd,
828 struct obd_uuid *cluuid)
830 struct obd_export *export;
831 struct cfs_hash *hash = NULL;
835 OBD_ALLOC_PTR(export);
837 return ERR_PTR(-ENOMEM);
839 export->exp_conn_cnt = 0;
840 export->exp_lock_hash = NULL;
841 export->exp_flock_hash = NULL;
842 atomic_set(&export->exp_refcount, 2);
843 atomic_set(&export->exp_rpc_count, 0);
844 atomic_set(&export->exp_cb_count, 0);
845 atomic_set(&export->exp_locks_count, 0);
846 #if LUSTRE_TRACKS_LOCK_EXP_REFS
847 INIT_LIST_HEAD(&export->exp_locks_list);
848 spin_lock_init(&export->exp_locks_list_guard);
850 atomic_set(&export->exp_replay_count, 0);
851 export->exp_obd = obd;
852 INIT_LIST_HEAD(&export->exp_outstanding_replies);
853 spin_lock_init(&export->exp_uncommitted_replies_lock);
854 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
855 INIT_LIST_HEAD(&export->exp_req_replay_queue);
856 INIT_LIST_HEAD(&export->exp_handle.h_link);
857 INIT_LIST_HEAD(&export->exp_hp_rpcs);
858 INIT_LIST_HEAD(&export->exp_reg_rpcs);
859 class_handle_hash(&export->exp_handle, &export_handle_ops);
860 export->exp_last_request_time = cfs_time_current_sec();
861 spin_lock_init(&export->exp_lock);
862 spin_lock_init(&export->exp_rpc_lock);
863 INIT_HLIST_NODE(&export->exp_uuid_hash);
864 INIT_HLIST_NODE(&export->exp_nid_hash);
865 spin_lock_init(&export->exp_bl_list_lock);
866 INIT_LIST_HEAD(&export->exp_bl_list);
868 export->exp_sp_peer = LUSTRE_SP_ANY;
869 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
870 export->exp_client_uuid = *cluuid;
871 obd_init_export(export);
873 spin_lock(&obd->obd_dev_lock);
874 /* shouldn't happen, but might race */
875 if (obd->obd_stopping)
876 GOTO(exit_unlock, rc = -ENODEV);
878 hash = cfs_hash_getref(obd->obd_uuid_hash);
880 GOTO(exit_unlock, rc = -ENODEV);
881 spin_unlock(&obd->obd_dev_lock);
883 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
884 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
886 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
887 obd->obd_name, cluuid->uuid, rc);
888 GOTO(exit_err, rc = -EALREADY);
892 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
893 spin_lock(&obd->obd_dev_lock);
894 if (obd->obd_stopping) {
895 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
896 GOTO(exit_unlock, rc = -ENODEV);
899 class_incref(obd, "export", export);
900 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
901 list_add_tail(&export->exp_obd_chain_timed,
902 &export->exp_obd->obd_exports_timed);
903 export->exp_obd->obd_num_exports++;
904 spin_unlock(&obd->obd_dev_lock);
905 cfs_hash_putref(hash);
909 spin_unlock(&obd->obd_dev_lock);
912 cfs_hash_putref(hash);
913 class_handle_unhash(&export->exp_handle);
914 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
915 obd_destroy_export(export);
916 OBD_FREE_PTR(export);
919 EXPORT_SYMBOL(class_new_export);
921 void class_unlink_export(struct obd_export *exp)
923 class_handle_unhash(&exp->exp_handle);
925 spin_lock(&exp->exp_obd->obd_dev_lock);
926 /* delete an uuid-export hashitem from hashtables */
927 if (!hlist_unhashed(&exp->exp_uuid_hash))
928 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
929 &exp->exp_client_uuid,
930 &exp->exp_uuid_hash);
932 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
933 list_del_init(&exp->exp_obd_chain_timed);
934 exp->exp_obd->obd_num_exports--;
935 spin_unlock(&exp->exp_obd->obd_dev_lock);
936 class_export_put(exp);
939 /* Import management functions */
940 static void class_import_destroy(struct obd_import *imp)
944 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
945 imp->imp_obd->obd_name);
947 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
949 ptlrpc_put_connection_superhack(imp->imp_connection);
951 while (!list_empty(&imp->imp_conn_list)) {
952 struct obd_import_conn *imp_conn;
954 imp_conn = list_entry(imp->imp_conn_list.next,
955 struct obd_import_conn, oic_item);
956 list_del_init(&imp_conn->oic_item);
957 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
958 OBD_FREE(imp_conn, sizeof(*imp_conn));
961 LASSERT(imp->imp_sec == NULL);
962 class_decref(imp->imp_obd, "import", imp);
963 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
967 static void import_handle_addref(void *import)
969 class_import_get(import);
972 static struct portals_handle_ops import_handle_ops = {
973 .hop_addref = import_handle_addref,
977 struct obd_import *class_import_get(struct obd_import *import)
979 atomic_inc(&import->imp_refcount);
980 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
981 atomic_read(&import->imp_refcount),
982 import->imp_obd->obd_name);
985 EXPORT_SYMBOL(class_import_get);
987 void class_import_put(struct obd_import *imp)
991 LASSERT(list_empty(&imp->imp_zombie_chain));
992 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
994 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
995 atomic_read(&imp->imp_refcount) - 1,
996 imp->imp_obd->obd_name);
998 if (atomic_dec_and_test(&imp->imp_refcount)) {
999 CDEBUG(D_INFO, "final put import %p\n", imp);
1000 obd_zombie_import_add(imp);
1003 /* catch possible import put race */
1004 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1007 EXPORT_SYMBOL(class_import_put);
1009 static void init_imp_at(struct imp_at *at) {
1011 at_init(&at->iat_net_latency, 0, 0);
1012 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1013 /* max service estimates are tracked on the server side, so
1014 don't use the AT history here, just use the last reported
1015 val. (But keep hist for proc histogram, worst_ever) */
1016 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1021 struct obd_import *class_new_import(struct obd_device *obd)
1023 struct obd_import *imp;
1025 OBD_ALLOC(imp, sizeof(*imp));
1029 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1030 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1031 INIT_LIST_HEAD(&imp->imp_replay_list);
1032 INIT_LIST_HEAD(&imp->imp_sending_list);
1033 INIT_LIST_HEAD(&imp->imp_delayed_list);
1034 INIT_LIST_HEAD(&imp->imp_committed_list);
1035 imp->imp_replay_cursor = &imp->imp_committed_list;
1036 spin_lock_init(&imp->imp_lock);
1037 imp->imp_last_success_conn = 0;
1038 imp->imp_state = LUSTRE_IMP_NEW;
1039 imp->imp_obd = class_incref(obd, "import", imp);
1040 mutex_init(&imp->imp_sec_mutex);
1041 init_waitqueue_head(&imp->imp_recovery_waitq);
1043 atomic_set(&imp->imp_refcount, 2);
1044 atomic_set(&imp->imp_unregistering, 0);
1045 atomic_set(&imp->imp_inflight, 0);
1046 atomic_set(&imp->imp_replay_inflight, 0);
1047 atomic_set(&imp->imp_inval_count, 0);
1048 INIT_LIST_HEAD(&imp->imp_conn_list);
1049 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1050 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1051 init_imp_at(&imp->imp_at);
1053 /* the default magic is V2, will be used in connect RPC, and
1054 * then adjusted according to the flags in request/reply. */
1055 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1059 EXPORT_SYMBOL(class_new_import);
1061 void class_destroy_import(struct obd_import *import)
1063 LASSERT(import != NULL);
1064 LASSERT(import != LP_POISON);
1066 class_handle_unhash(&import->imp_handle);
1068 spin_lock(&import->imp_lock);
1069 import->imp_generation++;
1070 spin_unlock(&import->imp_lock);
1071 class_import_put(import);
1073 EXPORT_SYMBOL(class_destroy_import);
1075 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1077 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1079 spin_lock(&exp->exp_locks_list_guard);
1081 LASSERT(lock->l_exp_refs_nr >= 0);
1083 if (lock->l_exp_refs_target != NULL &&
1084 lock->l_exp_refs_target != exp) {
1085 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1086 exp, lock, lock->l_exp_refs_target);
1088 if ((lock->l_exp_refs_nr ++) == 0) {
1089 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1090 lock->l_exp_refs_target = exp;
1092 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1093 lock, exp, lock->l_exp_refs_nr);
1094 spin_unlock(&exp->exp_locks_list_guard);
1097 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1099 spin_lock(&exp->exp_locks_list_guard);
1100 LASSERT(lock->l_exp_refs_nr > 0);
1101 if (lock->l_exp_refs_target != exp) {
1102 LCONSOLE_WARN("lock %p, "
1103 "mismatching export pointers: %p, %p\n",
1104 lock, lock->l_exp_refs_target, exp);
1106 if (-- lock->l_exp_refs_nr == 0) {
1107 list_del_init(&lock->l_exp_refs_link);
1108 lock->l_exp_refs_target = NULL;
1110 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1111 lock, exp, lock->l_exp_refs_nr);
1112 spin_unlock(&exp->exp_locks_list_guard);
1116 /* A connection defines an export context in which preallocation can
1117 be managed. This releases the export pointer reference, and returns
1118 the export handle, so the export refcount is 1 when this function
1120 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1121 struct obd_uuid *cluuid)
1123 struct obd_export *export;
1124 LASSERT(conn != NULL);
1125 LASSERT(obd != NULL);
1126 LASSERT(cluuid != NULL);
1129 export = class_new_export(obd, cluuid);
1131 RETURN(PTR_ERR(export));
1133 conn->cookie = export->exp_handle.h_cookie;
1134 class_export_put(export);
1136 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1137 cluuid->uuid, conn->cookie);
1140 EXPORT_SYMBOL(class_connect);
1142 /* if export is involved in recovery then clean up related things */
1143 static void class_export_recovery_cleanup(struct obd_export *exp)
1145 struct obd_device *obd = exp->exp_obd;
1147 spin_lock(&obd->obd_recovery_task_lock);
1148 if (obd->obd_recovering) {
1149 if (exp->exp_in_recovery) {
1150 spin_lock(&exp->exp_lock);
1151 exp->exp_in_recovery = 0;
1152 spin_unlock(&exp->exp_lock);
1153 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1154 atomic_dec(&obd->obd_connected_clients);
1157 /* if called during recovery then should update
1158 * obd_stale_clients counter,
1159 * lightweight exports are not counted */
1160 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1161 exp->exp_obd->obd_stale_clients++;
1163 spin_unlock(&obd->obd_recovery_task_lock);
1165 spin_lock(&exp->exp_lock);
1166 /** Cleanup req replay fields */
1167 if (exp->exp_req_replay_needed) {
1168 exp->exp_req_replay_needed = 0;
1170 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1171 atomic_dec(&obd->obd_req_replay_clients);
1174 /** Cleanup lock replay data */
1175 if (exp->exp_lock_replay_needed) {
1176 exp->exp_lock_replay_needed = 0;
1178 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1179 atomic_dec(&obd->obd_lock_replay_clients);
1181 spin_unlock(&exp->exp_lock);
1184 /* This function removes 1-3 references from the export:
1185 * 1 - for export pointer passed
1186 * and if disconnect really need
1187 * 2 - removing from hash
1188 * 3 - in client_unlink_export
1189 * The export pointer passed to this function can destroyed */
1190 int class_disconnect(struct obd_export *export)
1192 int already_disconnected;
1195 if (export == NULL) {
1196 CWARN("attempting to free NULL export %p\n", export);
1200 spin_lock(&export->exp_lock);
1201 already_disconnected = export->exp_disconnected;
1202 export->exp_disconnected = 1;
1203 spin_unlock(&export->exp_lock);
1205 /* class_cleanup(), abort_recovery(), and class_fail_export()
1206 * all end up in here, and if any of them race we shouldn't
1207 * call extra class_export_puts(). */
1208 if (already_disconnected) {
1209 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1210 GOTO(no_disconn, already_disconnected);
1213 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1214 export->exp_handle.h_cookie);
1216 if (!hlist_unhashed(&export->exp_nid_hash))
1217 cfs_hash_del(export->exp_obd->obd_nid_hash,
1218 &export->exp_connection->c_peer.nid,
1219 &export->exp_nid_hash);
1221 class_export_recovery_cleanup(export);
1222 class_unlink_export(export);
1224 class_export_put(export);
1227 EXPORT_SYMBOL(class_disconnect);
1229 /* Return non-zero for a fully connected export */
1230 int class_connected_export(struct obd_export *exp)
1235 spin_lock(&exp->exp_lock);
1236 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1237 spin_unlock(&exp->exp_lock);
1241 EXPORT_SYMBOL(class_connected_export);
1243 static void class_disconnect_export_list(struct list_head *list,
1244 enum obd_option flags)
1247 struct obd_export *exp;
1250 /* It's possible that an export may disconnect itself, but
1251 * nothing else will be added to this list. */
1252 while (!list_empty(list)) {
1253 exp = list_entry(list->next, struct obd_export,
1255 /* need for safe call CDEBUG after obd_disconnect */
1256 class_export_get(exp);
1258 spin_lock(&exp->exp_lock);
1259 exp->exp_flags = flags;
1260 spin_unlock(&exp->exp_lock);
1262 if (obd_uuid_equals(&exp->exp_client_uuid,
1263 &exp->exp_obd->obd_uuid)) {
1265 "exp %p export uuid == obd uuid, don't discon\n",
1267 /* Need to delete this now so we don't end up pointing
1268 * to work_list later when this export is cleaned up. */
1269 list_del_init(&exp->exp_obd_chain);
1270 class_export_put(exp);
1274 class_export_get(exp);
1275 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1276 "last request at "CFS_TIME_T"\n",
1277 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1278 exp, exp->exp_last_request_time);
1279 /* release one export reference anyway */
1280 rc = obd_disconnect(exp);
1282 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1283 obd_export_nid2str(exp), exp, rc);
1284 class_export_put(exp);
1289 void class_disconnect_exports(struct obd_device *obd)
1291 struct list_head work_list;
1294 /* Move all of the exports from obd_exports to a work list, en masse. */
1295 INIT_LIST_HEAD(&work_list);
1296 spin_lock(&obd->obd_dev_lock);
1297 list_splice_init(&obd->obd_exports, &work_list);
1298 list_splice_init(&obd->obd_delayed_exports, &work_list);
1299 spin_unlock(&obd->obd_dev_lock);
1301 if (!list_empty(&work_list)) {
1302 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1303 "disconnecting them\n", obd->obd_minor, obd);
1304 class_disconnect_export_list(&work_list,
1305 exp_flags_from_obd(obd));
1307 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1308 obd->obd_minor, obd);
1311 EXPORT_SYMBOL(class_disconnect_exports);
1313 /* Remove exports that have not completed recovery.
1315 void class_disconnect_stale_exports(struct obd_device *obd,
1316 int (*test_export)(struct obd_export *))
1318 struct list_head work_list;
1319 struct obd_export *exp, *n;
1323 INIT_LIST_HEAD(&work_list);
1324 spin_lock(&obd->obd_dev_lock);
1325 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1327 /* don't count self-export as client */
1328 if (obd_uuid_equals(&exp->exp_client_uuid,
1329 &exp->exp_obd->obd_uuid))
1332 /* don't evict clients which have no slot in last_rcvd
1333 * (e.g. lightweight connection) */
1334 if (exp->exp_target_data.ted_lr_idx == -1)
1337 spin_lock(&exp->exp_lock);
1338 if (exp->exp_failed || test_export(exp)) {
1339 spin_unlock(&exp->exp_lock);
1342 exp->exp_failed = 1;
1343 spin_unlock(&exp->exp_lock);
1345 list_move(&exp->exp_obd_chain, &work_list);
1347 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1348 obd->obd_name, exp->exp_client_uuid.uuid,
1349 exp->exp_connection == NULL ? "<unknown>" :
1350 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1351 print_export_data(exp, "EVICTING", 0);
1353 spin_unlock(&obd->obd_dev_lock);
1356 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1357 obd->obd_name, evicted);
1359 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1360 OBD_OPT_ABORT_RECOV);
1363 EXPORT_SYMBOL(class_disconnect_stale_exports);
1365 void class_fail_export(struct obd_export *exp)
1367 int rc, already_failed;
1369 spin_lock(&exp->exp_lock);
1370 already_failed = exp->exp_failed;
1371 exp->exp_failed = 1;
1372 spin_unlock(&exp->exp_lock);
1374 if (already_failed) {
1375 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1376 exp, exp->exp_client_uuid.uuid);
1380 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1381 exp, exp->exp_client_uuid.uuid);
1383 if (obd_dump_on_timeout)
1384 libcfs_debug_dumplog();
1386 /* need for safe call CDEBUG after obd_disconnect */
1387 class_export_get(exp);
1389 /* Most callers into obd_disconnect are removing their own reference
1390 * (request, for example) in addition to the one from the hash table.
1391 * We don't have such a reference here, so make one. */
1392 class_export_get(exp);
1393 rc = obd_disconnect(exp);
1395 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1397 CDEBUG(D_HA, "disconnected export %p/%s\n",
1398 exp, exp->exp_client_uuid.uuid);
1399 class_export_put(exp);
1401 EXPORT_SYMBOL(class_fail_export);
1403 char *obd_export_nid2str(struct obd_export *exp)
1405 if (exp->exp_connection != NULL)
1406 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1410 EXPORT_SYMBOL(obd_export_nid2str);
1412 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1414 struct cfs_hash *nid_hash;
1415 struct obd_export *doomed_exp = NULL;
1416 int exports_evicted = 0;
1418 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1420 spin_lock(&obd->obd_dev_lock);
1421 /* umount has run already, so evict thread should leave
1422 * its task to umount thread now */
1423 if (obd->obd_stopping) {
1424 spin_unlock(&obd->obd_dev_lock);
1425 return exports_evicted;
1427 nid_hash = obd->obd_nid_hash;
1428 cfs_hash_getref(nid_hash);
1429 spin_unlock(&obd->obd_dev_lock);
1432 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1433 if (doomed_exp == NULL)
1436 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1437 "nid %s found, wanted nid %s, requested nid %s\n",
1438 obd_export_nid2str(doomed_exp),
1439 libcfs_nid2str(nid_key), nid);
1440 LASSERTF(doomed_exp != obd->obd_self_export,
1441 "self-export is hashed by NID?\n");
1443 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1444 "request\n", obd->obd_name,
1445 obd_uuid2str(&doomed_exp->exp_client_uuid),
1446 obd_export_nid2str(doomed_exp));
1447 class_fail_export(doomed_exp);
1448 class_export_put(doomed_exp);
1451 cfs_hash_putref(nid_hash);
1453 if (!exports_evicted)
1454 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1455 obd->obd_name, nid);
1456 return exports_evicted;
1458 EXPORT_SYMBOL(obd_export_evict_by_nid);
1460 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1462 struct cfs_hash *uuid_hash;
1463 struct obd_export *doomed_exp = NULL;
1464 struct obd_uuid doomed_uuid;
1465 int exports_evicted = 0;
1467 spin_lock(&obd->obd_dev_lock);
1468 if (obd->obd_stopping) {
1469 spin_unlock(&obd->obd_dev_lock);
1470 return exports_evicted;
1472 uuid_hash = obd->obd_uuid_hash;
1473 cfs_hash_getref(uuid_hash);
1474 spin_unlock(&obd->obd_dev_lock);
1476 obd_str2uuid(&doomed_uuid, uuid);
1477 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1478 CERROR("%s: can't evict myself\n", obd->obd_name);
1479 cfs_hash_putref(uuid_hash);
1480 return exports_evicted;
1483 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1485 if (doomed_exp == NULL) {
1486 CERROR("%s: can't disconnect %s: no exports found\n",
1487 obd->obd_name, uuid);
1489 CWARN("%s: evicting %s at adminstrative request\n",
1490 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1491 class_fail_export(doomed_exp);
1492 class_export_put(doomed_exp);
1495 cfs_hash_putref(uuid_hash);
1497 return exports_evicted;
1500 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1501 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1504 static void print_export_data(struct obd_export *exp, const char *status,
1507 struct ptlrpc_reply_state *rs;
1508 struct ptlrpc_reply_state *first_reply = NULL;
1511 spin_lock(&exp->exp_lock);
1512 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1518 spin_unlock(&exp->exp_lock);
1520 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1521 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1522 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1523 atomic_read(&exp->exp_rpc_count),
1524 atomic_read(&exp->exp_cb_count),
1525 atomic_read(&exp->exp_locks_count),
1526 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1527 nreplies, first_reply, nreplies > 3 ? "..." : "",
1528 exp->exp_last_committed);
1529 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1530 if (locks && class_export_dump_hook != NULL)
1531 class_export_dump_hook(exp);
1535 void dump_exports(struct obd_device *obd, int locks)
1537 struct obd_export *exp;
1539 spin_lock(&obd->obd_dev_lock);
1540 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1541 print_export_data(exp, "ACTIVE", locks);
1542 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1543 print_export_data(exp, "UNLINKED", locks);
1544 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1545 print_export_data(exp, "DELAYED", locks);
1546 spin_unlock(&obd->obd_dev_lock);
1547 spin_lock(&obd_zombie_impexp_lock);
1548 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1549 print_export_data(exp, "ZOMBIE", locks);
1550 spin_unlock(&obd_zombie_impexp_lock);
1553 void obd_exports_barrier(struct obd_device *obd)
1556 LASSERT(list_empty(&obd->obd_exports));
1557 spin_lock(&obd->obd_dev_lock);
1558 while (!list_empty(&obd->obd_unlinked_exports)) {
1559 spin_unlock(&obd->obd_dev_lock);
1560 set_current_state(TASK_UNINTERRUPTIBLE);
1561 schedule_timeout(cfs_time_seconds(waited));
1562 if (waited > 5 && IS_PO2(waited)) {
1563 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1564 "more than %d seconds. "
1565 "The obd refcount = %d. Is it stuck?\n",
1566 obd->obd_name, waited,
1567 atomic_read(&obd->obd_refcount));
1568 dump_exports(obd, 1);
1571 spin_lock(&obd->obd_dev_lock);
1573 spin_unlock(&obd->obd_dev_lock);
1575 EXPORT_SYMBOL(obd_exports_barrier);
1577 /* Total amount of zombies to be destroyed */
1578 static int zombies_count = 0;
1581 * kill zombie imports and exports
1583 void obd_zombie_impexp_cull(void)
1585 struct obd_import *import;
1586 struct obd_export *export;
1590 spin_lock(&obd_zombie_impexp_lock);
1593 if (!list_empty(&obd_zombie_imports)) {
1594 import = list_entry(obd_zombie_imports.next,
1597 list_del_init(&import->imp_zombie_chain);
1601 if (!list_empty(&obd_zombie_exports)) {
1602 export = list_entry(obd_zombie_exports.next,
1605 list_del_init(&export->exp_obd_chain);
1608 spin_unlock(&obd_zombie_impexp_lock);
1610 if (import != NULL) {
1611 class_import_destroy(import);
1612 spin_lock(&obd_zombie_impexp_lock);
1614 spin_unlock(&obd_zombie_impexp_lock);
1617 if (export != NULL) {
1618 class_export_destroy(export);
1619 spin_lock(&obd_zombie_impexp_lock);
1621 spin_unlock(&obd_zombie_impexp_lock);
1625 } while (import != NULL || export != NULL);
1629 static struct completion obd_zombie_start;
1630 static struct completion obd_zombie_stop;
1631 static unsigned long obd_zombie_flags;
1632 static wait_queue_head_t obd_zombie_waitq;
1633 static pid_t obd_zombie_pid;
1636 OBD_ZOMBIE_STOP = 0x0001,
1640 * check for work for kill zombie import/export thread.
1642 static int obd_zombie_impexp_check(void *arg)
1646 spin_lock(&obd_zombie_impexp_lock);
1647 rc = (zombies_count == 0) &&
1648 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1649 spin_unlock(&obd_zombie_impexp_lock);
1655 * Add export to the obd_zombe thread and notify it.
1657 static void obd_zombie_export_add(struct obd_export *exp) {
1658 spin_lock(&exp->exp_obd->obd_dev_lock);
1659 LASSERT(!list_empty(&exp->exp_obd_chain));
1660 list_del_init(&exp->exp_obd_chain);
1661 spin_unlock(&exp->exp_obd->obd_dev_lock);
1662 spin_lock(&obd_zombie_impexp_lock);
1664 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1665 spin_unlock(&obd_zombie_impexp_lock);
1667 obd_zombie_impexp_notify();
1671 * Add import to the obd_zombe thread and notify it.
1673 static void obd_zombie_import_add(struct obd_import *imp) {
1674 LASSERT(imp->imp_sec == NULL);
1675 LASSERT(imp->imp_rq_pool == NULL);
1676 spin_lock(&obd_zombie_impexp_lock);
1677 LASSERT(list_empty(&imp->imp_zombie_chain));
1679 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1680 spin_unlock(&obd_zombie_impexp_lock);
1682 obd_zombie_impexp_notify();
1686 * notify import/export destroy thread about new zombie.
1688 static void obd_zombie_impexp_notify(void)
1691 * Make sure obd_zomebie_impexp_thread get this notification.
1692 * It is possible this signal only get by obd_zombie_barrier, and
1693 * barrier gulps this notification and sleeps away and hangs ensues
1695 wake_up_all(&obd_zombie_waitq);
1699 * check whether obd_zombie is idle
1701 static int obd_zombie_is_idle(void)
1705 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1706 spin_lock(&obd_zombie_impexp_lock);
1707 rc = (zombies_count == 0);
1708 spin_unlock(&obd_zombie_impexp_lock);
1713 * wait when obd_zombie import/export queues become empty
1715 void obd_zombie_barrier(void)
1717 struct l_wait_info lwi = { 0 };
1719 if (obd_zombie_pid == current_pid())
1720 /* don't wait for myself */
1722 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1724 EXPORT_SYMBOL(obd_zombie_barrier);
1728 * destroy zombie export/import thread.
1730 static int obd_zombie_impexp_thread(void *unused)
1732 unshare_fs_struct();
1733 complete(&obd_zombie_start);
1735 obd_zombie_pid = current_pid();
1737 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1738 struct l_wait_info lwi = { 0 };
1740 l_wait_event(obd_zombie_waitq,
1741 !obd_zombie_impexp_check(NULL), &lwi);
1742 obd_zombie_impexp_cull();
1745 * Notify obd_zombie_barrier callers that queues
1748 wake_up(&obd_zombie_waitq);
1751 complete(&obd_zombie_stop);
1758 * start destroy zombie import/export thread
1760 int obd_zombie_impexp_init(void)
1762 struct task_struct *task;
1764 INIT_LIST_HEAD(&obd_zombie_imports);
1766 INIT_LIST_HEAD(&obd_zombie_exports);
1767 spin_lock_init(&obd_zombie_impexp_lock);
1768 init_completion(&obd_zombie_start);
1769 init_completion(&obd_zombie_stop);
1770 init_waitqueue_head(&obd_zombie_waitq);
1773 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1775 RETURN(PTR_ERR(task));
1777 wait_for_completion(&obd_zombie_start);
1781 * stop destroy zombie import/export thread
1783 void obd_zombie_impexp_stop(void)
1785 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1786 obd_zombie_impexp_notify();
1787 wait_for_completion(&obd_zombie_stop);
1790 /***** Kernel-userspace comm helpers *******/
1792 /* Get length of entire message, including header */
1793 int kuc_len(int payload_len)
1795 return sizeof(struct kuc_hdr) + payload_len;
1797 EXPORT_SYMBOL(kuc_len);
1799 /* Get a pointer to kuc header, given a ptr to the payload
1800 * @param p Pointer to payload area
1801 * @returns Pointer to kuc header
1803 struct kuc_hdr * kuc_ptr(void *p)
1805 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1806 LASSERT(lh->kuc_magic == KUC_MAGIC);
1809 EXPORT_SYMBOL(kuc_ptr);
1811 /* Test if payload is part of kuc message
1812 * @param p Pointer to payload area
1815 int kuc_ispayload(void *p)
1817 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1819 if (kh->kuc_magic == KUC_MAGIC)
1824 EXPORT_SYMBOL(kuc_ispayload);
1826 /* Alloc space for a message, and fill in header
1827 * @return Pointer to payload area
1829 void *kuc_alloc(int payload_len, int transport, int type)
1832 int len = kuc_len(payload_len);
1836 return ERR_PTR(-ENOMEM);
1838 lh->kuc_magic = KUC_MAGIC;
1839 lh->kuc_transport = transport;
1840 lh->kuc_msgtype = type;
1841 lh->kuc_msglen = len;
1843 return (void *)(lh + 1);
1845 EXPORT_SYMBOL(kuc_alloc);
1847 /* Takes pointer to payload area */
1848 inline void kuc_free(void *p, int payload_len)
1850 struct kuc_hdr *lh = kuc_ptr(p);
1851 OBD_FREE(lh, kuc_len(payload_len));
1853 EXPORT_SYMBOL(kuc_free);
1855 struct obd_request_slot_waiter {
1856 struct list_head orsw_entry;
1857 wait_queue_head_t orsw_waitq;
1861 static bool obd_request_slot_avail(struct client_obd *cli,
1862 struct obd_request_slot_waiter *orsw)
1866 spin_lock(&cli->cl_loi_list_lock);
1867 avail = !!list_empty(&orsw->orsw_entry);
1868 spin_unlock(&cli->cl_loi_list_lock);
1874 * For network flow control, the RPC sponsor needs to acquire a credit
1875 * before sending the RPC. The credits count for a connection is defined
1876 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1877 * the subsequent RPC sponsors need to wait until others released their
1878 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1880 int obd_get_request_slot(struct client_obd *cli)
1882 struct obd_request_slot_waiter orsw;
1883 struct l_wait_info lwi;
1886 spin_lock(&cli->cl_loi_list_lock);
1887 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1888 cli->cl_r_in_flight++;
1889 spin_unlock(&cli->cl_loi_list_lock);
1893 init_waitqueue_head(&orsw.orsw_waitq);
1894 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1895 orsw.orsw_signaled = false;
1896 spin_unlock(&cli->cl_loi_list_lock);
1898 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1899 rc = l_wait_event(orsw.orsw_waitq,
1900 obd_request_slot_avail(cli, &orsw) ||
1904 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1905 * freed but other (such as obd_put_request_slot) is using it. */
1906 spin_lock(&cli->cl_loi_list_lock);
1908 if (!orsw.orsw_signaled) {
1909 if (list_empty(&orsw.orsw_entry))
1910 cli->cl_r_in_flight--;
1912 list_del(&orsw.orsw_entry);
1916 if (orsw.orsw_signaled) {
1917 LASSERT(list_empty(&orsw.orsw_entry));
1921 spin_unlock(&cli->cl_loi_list_lock);
1925 EXPORT_SYMBOL(obd_get_request_slot);
1927 void obd_put_request_slot(struct client_obd *cli)
1929 struct obd_request_slot_waiter *orsw;
1931 spin_lock(&cli->cl_loi_list_lock);
1932 cli->cl_r_in_flight--;
1934 /* If there is free slot, wakeup the first waiter. */
1935 if (!list_empty(&cli->cl_loi_read_list) &&
1936 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1937 orsw = list_entry(cli->cl_loi_read_list.next,
1938 struct obd_request_slot_waiter, orsw_entry);
1939 list_del_init(&orsw->orsw_entry);
1940 cli->cl_r_in_flight++;
1941 wake_up(&orsw->orsw_waitq);
1943 spin_unlock(&cli->cl_loi_list_lock);
1945 EXPORT_SYMBOL(obd_put_request_slot);
1947 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1949 return cli->cl_max_rpcs_in_flight;
1951 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1953 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1955 struct obd_request_slot_waiter *orsw;
1962 if (max > OBD_MAX_RIF_MAX || max < 1)
1965 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
1966 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
1967 /* adjust max_mod_rpcs_in_flight to ensure it is always
1968 * strictly lower that max_rpcs_in_flight */
1970 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
1971 "because it must be higher than "
1972 "max_mod_rpcs_in_flight value",
1973 cli->cl_import->imp_obd->obd_name);
1976 if (max <= cli->cl_max_mod_rpcs_in_flight) {
1977 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
1983 spin_lock(&cli->cl_loi_list_lock);
1984 old = cli->cl_max_rpcs_in_flight;
1985 cli->cl_max_rpcs_in_flight = max;
1988 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1989 for (i = 0; i < diff; i++) {
1990 if (list_empty(&cli->cl_loi_read_list))
1993 orsw = list_entry(cli->cl_loi_read_list.next,
1994 struct obd_request_slot_waiter, orsw_entry);
1995 list_del_init(&orsw->orsw_entry);
1996 cli->cl_r_in_flight++;
1997 wake_up(&orsw->orsw_waitq);
1999 spin_unlock(&cli->cl_loi_list_lock);
2003 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2005 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2007 return cli->cl_max_mod_rpcs_in_flight;
2009 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2011 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2013 struct obd_connect_data *ocd;
2017 if (max > OBD_MAX_RIF_MAX || max < 1)
2020 /* cannot exceed or equal max_rpcs_in_flight */
2021 if (max >= cli->cl_max_rpcs_in_flight) {
2022 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2023 "higher or equal to max_rpcs_in_flight value (%u)\n",
2024 cli->cl_import->imp_obd->obd_name,
2025 max, cli->cl_max_rpcs_in_flight);
2029 /* cannot exceed max modify RPCs in flight supported by the server */
2030 ocd = &cli->cl_import->imp_connect_data;
2031 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2032 maxmodrpcs = ocd->ocd_maxmodrpcs;
2035 if (max > maxmodrpcs) {
2036 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2037 "higher than max_mod_rpcs_per_client value (%hu) "
2038 "returned by the server at connection\n",
2039 cli->cl_import->imp_obd->obd_name,
2044 spin_lock(&cli->cl_mod_rpcs_lock);
2046 prev = cli->cl_max_mod_rpcs_in_flight;
2047 cli->cl_max_mod_rpcs_in_flight = max;
2049 /* wakeup waiters if limit has been increased */
2050 if (cli->cl_max_mod_rpcs_in_flight > prev)
2051 wake_up(&cli->cl_mod_rpcs_waitq);
2053 spin_unlock(&cli->cl_mod_rpcs_lock);
2057 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2060 #define pct(a, b) (b ? a * 100 / b : 0)
2061 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2062 struct seq_file *seq)
2065 unsigned long mod_tot = 0, mod_cum;
2068 do_gettimeofday(&now);
2070 spin_lock(&cli->cl_mod_rpcs_lock);
2072 seq_printf(seq, "snapshot_time: %lu.%lu (secs.usecs)\n",
2073 now.tv_sec, now.tv_usec);
2074 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2075 cli->cl_mod_rpcs_in_flight);
2077 seq_printf(seq, "\n\t\t\tmodify\n");
2078 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2080 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2083 for (i = 0; i < OBD_HIST_MAX; i++) {
2084 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2086 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2087 i, mod, pct(mod, mod_tot),
2088 pct(mod_cum, mod_tot));
2089 if (mod_cum == mod_tot)
2093 spin_unlock(&cli->cl_mod_rpcs_lock);
2097 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2101 /* The number of modify RPCs sent in parallel is limited
2102 * because the server has a finite number of slots per client to
2103 * store request result and ensure reply reconstruction when needed.
2104 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2105 * that takes into account server limit and cl_max_rpcs_in_flight
2107 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2108 * one close request is allowed above the maximum.
2110 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2115 /* A slot is available if
2116 * - number of modify RPCs in flight is less than the max
2117 * - it's a close RPC and no other close request is in flight
2119 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2120 (close_req && cli->cl_close_rpcs_in_flight == 0);
2125 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2130 spin_lock(&cli->cl_mod_rpcs_lock);
2131 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2132 spin_unlock(&cli->cl_mod_rpcs_lock);
2136 /* Get a modify RPC slot from the obd client @cli according
2137 * to the kind of operation @opc that is going to be sent
2138 * and the intent @it of the operation if it applies.
2139 * If the maximum number of modify RPCs in flight is reached
2140 * the thread is put to sleep.
2141 * Returns the tag to be set in the request message. Tag 0
2142 * is reserved for non-modifying requests.
2144 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2145 struct lookup_intent *it)
2147 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2148 bool close_req = false;
2151 /* read-only metadata RPCs don't consume a slot on MDT
2152 * for reply reconstruction
2154 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2155 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2158 if (opc == MDS_CLOSE)
2162 spin_lock(&cli->cl_mod_rpcs_lock);
2163 max = cli->cl_max_mod_rpcs_in_flight;
2164 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2165 /* there is a slot available */
2166 cli->cl_mod_rpcs_in_flight++;
2168 cli->cl_close_rpcs_in_flight++;
2169 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2170 cli->cl_mod_rpcs_in_flight);
2171 /* find a free tag */
2172 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2174 LASSERT(i < OBD_MAX_RIF_MAX);
2175 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2176 spin_unlock(&cli->cl_mod_rpcs_lock);
2177 /* tag 0 is reserved for non-modify RPCs */
2180 spin_unlock(&cli->cl_mod_rpcs_lock);
2182 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2183 "opc %u, max %hu\n",
2184 cli->cl_import->imp_obd->obd_name, opc, max);
2186 l_wait_event(cli->cl_mod_rpcs_waitq,
2187 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2190 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2192 /* Put a modify RPC slot from the obd client @cli according
2193 * to the kind of operation @opc that has been sent and the
2194 * intent @it of the operation if it applies.
2196 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2197 struct lookup_intent *it, __u16 tag)
2199 bool close_req = false;
2201 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2202 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2205 if (opc == MDS_CLOSE)
2208 spin_lock(&cli->cl_mod_rpcs_lock);
2209 cli->cl_mod_rpcs_in_flight--;
2211 cli->cl_close_rpcs_in_flight--;
2212 /* release the tag in the bitmap */
2213 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2214 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2215 spin_unlock(&cli->cl_mod_rpcs_lock);
2216 wake_up(&cli->cl_mod_rpcs_waitq);
2218 EXPORT_SYMBOL(obd_put_mod_rpc_slot);