4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_kernelcomm.h>
49 spinlock_t obd_types_lock;
51 static struct kmem_cache *obd_device_cachep;
52 struct kmem_cache *obdo_cachep;
53 EXPORT_SYMBOL(obdo_cachep);
54 static struct kmem_cache *import_cachep;
56 static struct list_head obd_zombie_imports;
57 static struct list_head obd_zombie_exports;
58 static spinlock_t obd_zombie_impexp_lock;
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64 const char *status, int locks);
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
70 * support functions: we could use inter-module communication, but this
71 * is more portable to other OS's
73 static struct obd_device *obd_device_alloc(void)
75 struct obd_device *obd;
77 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79 obd->obd_magic = OBD_DEVICE_MAGIC;
84 static void obd_device_free(struct obd_device *obd)
87 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89 if (obd->obd_namespace != NULL) {
90 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91 obd, obd->obd_namespace, obd->obd_force);
94 lu_ref_fini(&obd->obd_reference);
95 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 struct obd_type *class_search_type(const char *name)
100 struct list_head *tmp;
101 struct obd_type *type;
103 spin_lock(&obd_types_lock);
104 list_for_each(tmp, &obd_types) {
105 type = list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 spin_unlock(&obd_types_lock);
111 spin_unlock(&obd_types_lock);
114 EXPORT_SYMBOL(class_search_type);
116 struct obd_type *class_get_type(const char *name)
118 struct obd_type *type = class_search_type(name);
120 #ifdef HAVE_MODULE_LOADING_SUPPORT
122 const char *modname = name;
124 if (strcmp(modname, "obdfilter") == 0)
127 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
128 modname = LUSTRE_OSP_NAME;
130 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
131 modname = LUSTRE_MDT_NAME;
133 if (!request_module("%s", modname)) {
134 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
135 type = class_search_type(name);
137 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
143 spin_lock(&type->obd_type_lock);
145 try_module_get(type->typ_dt_ops->o_owner);
146 spin_unlock(&type->obd_type_lock);
151 void class_put_type(struct obd_type *type)
154 spin_lock(&type->obd_type_lock);
156 module_put(type->typ_dt_ops->o_owner);
157 spin_unlock(&type->obd_type_lock);
160 #define CLASS_MAX_NAME 1024
162 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
163 bool enable_proc, struct lprocfs_vars *vars,
164 const char *name, struct lu_device_type *ldt)
166 struct obd_type *type;
171 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
173 if (class_search_type(name)) {
174 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
179 OBD_ALLOC(type, sizeof(*type));
183 OBD_ALLOC_PTR(type->typ_dt_ops);
184 OBD_ALLOC_PTR(type->typ_md_ops);
185 OBD_ALLOC(type->typ_name, strlen(name) + 1);
187 if (type->typ_dt_ops == NULL ||
188 type->typ_md_ops == NULL ||
189 type->typ_name == NULL)
192 *(type->typ_dt_ops) = *dt_ops;
193 /* md_ops is optional */
195 *(type->typ_md_ops) = *md_ops;
196 strcpy(type->typ_name, name);
197 spin_lock_init(&type->obd_type_lock);
199 #ifdef CONFIG_PROC_FS
201 type->typ_procroot = lprocfs_register(type->typ_name,
204 if (IS_ERR(type->typ_procroot)) {
205 rc = PTR_ERR(type->typ_procroot);
206 type->typ_procroot = NULL;
213 rc = lu_device_type_init(ldt);
218 spin_lock(&obd_types_lock);
219 list_add(&type->typ_chain, &obd_types);
220 spin_unlock(&obd_types_lock);
225 if (type->typ_name != NULL) {
226 #ifdef CONFIG_PROC_FS
227 if (type->typ_procroot != NULL)
228 remove_proc_subtree(type->typ_name, proc_lustre_root);
230 OBD_FREE(type->typ_name, strlen(name) + 1);
232 if (type->typ_md_ops != NULL)
233 OBD_FREE_PTR(type->typ_md_ops);
234 if (type->typ_dt_ops != NULL)
235 OBD_FREE_PTR(type->typ_dt_ops);
236 OBD_FREE(type, sizeof(*type));
239 EXPORT_SYMBOL(class_register_type);
241 int class_unregister_type(const char *name)
243 struct obd_type *type = class_search_type(name);
247 CERROR("unknown obd type\n");
251 if (type->typ_refcnt) {
252 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
253 /* This is a bad situation, let's make the best of it */
254 /* Remove ops, but leave the name for debugging */
255 OBD_FREE_PTR(type->typ_dt_ops);
256 OBD_FREE_PTR(type->typ_md_ops);
260 /* we do not use type->typ_procroot as for compatibility purposes
261 * other modules can share names (i.e. lod can use lov entry). so
262 * we can't reference pointer as it can get invalided when another
263 * module removes the entry */
264 #ifdef CONFIG_PROC_FS
265 if (type->typ_procroot != NULL)
266 remove_proc_subtree(type->typ_name, proc_lustre_root);
267 if (type->typ_procsym != NULL)
268 lprocfs_remove(&type->typ_procsym);
271 lu_device_type_fini(type->typ_lu);
273 spin_lock(&obd_types_lock);
274 list_del(&type->typ_chain);
275 spin_unlock(&obd_types_lock);
276 OBD_FREE(type->typ_name, strlen(name) + 1);
277 if (type->typ_dt_ops != NULL)
278 OBD_FREE_PTR(type->typ_dt_ops);
279 if (type->typ_md_ops != NULL)
280 OBD_FREE_PTR(type->typ_md_ops);
281 OBD_FREE(type, sizeof(*type));
283 } /* class_unregister_type */
284 EXPORT_SYMBOL(class_unregister_type);
287 * Create a new obd device.
289 * Find an empty slot in ::obd_devs[], create a new obd device in it.
291 * \param[in] type_name obd device type string.
292 * \param[in] name obd device name.
294 * \retval NULL if create fails, otherwise return the obd device
297 struct obd_device *class_newdev(const char *type_name, const char *name)
299 struct obd_device *result = NULL;
300 struct obd_device *newdev;
301 struct obd_type *type = NULL;
303 int new_obd_minor = 0;
306 if (strlen(name) >= MAX_OBD_NAME) {
307 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
308 RETURN(ERR_PTR(-EINVAL));
311 type = class_get_type(type_name);
313 CERROR("OBD: unknown type: %s\n", type_name);
314 RETURN(ERR_PTR(-ENODEV));
317 newdev = obd_device_alloc();
319 GOTO(out_type, result = ERR_PTR(-ENOMEM));
321 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
323 write_lock(&obd_dev_lock);
324 for (i = 0; i < class_devno_max(); i++) {
325 struct obd_device *obd = class_num2obd(i);
327 if (obd && (strcmp(name, obd->obd_name) == 0)) {
328 CERROR("Device %s already exists at %d, won't add\n",
331 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
332 "%p obd_magic %08x != %08x\n", result,
333 result->obd_magic, OBD_DEVICE_MAGIC);
334 LASSERTF(result->obd_minor == new_obd_minor,
335 "%p obd_minor %d != %d\n", result,
336 result->obd_minor, new_obd_minor);
338 obd_devs[result->obd_minor] = NULL;
339 result->obd_name[0]='\0';
341 result = ERR_PTR(-EEXIST);
344 if (!result && !obd) {
346 result->obd_minor = i;
348 result->obd_type = type;
349 strncpy(result->obd_name, name,
350 sizeof(result->obd_name) - 1);
351 obd_devs[i] = result;
354 write_unlock(&obd_dev_lock);
356 if (result == NULL && i >= class_devno_max()) {
357 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
359 GOTO(out, result = ERR_PTR(-EOVERFLOW));
365 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
366 result->obd_name, result);
370 obd_device_free(newdev);
372 class_put_type(type);
376 void class_release_dev(struct obd_device *obd)
378 struct obd_type *obd_type = obd->obd_type;
380 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
381 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
382 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
383 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
384 LASSERT(obd_type != NULL);
386 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
387 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
389 write_lock(&obd_dev_lock);
390 obd_devs[obd->obd_minor] = NULL;
391 write_unlock(&obd_dev_lock);
392 obd_device_free(obd);
394 class_put_type(obd_type);
397 int class_name2dev(const char *name)
404 read_lock(&obd_dev_lock);
405 for (i = 0; i < class_devno_max(); i++) {
406 struct obd_device *obd = class_num2obd(i);
408 if (obd && strcmp(name, obd->obd_name) == 0) {
409 /* Make sure we finished attaching before we give
410 out any references */
411 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
412 if (obd->obd_attached) {
413 read_unlock(&obd_dev_lock);
419 read_unlock(&obd_dev_lock);
424 struct obd_device *class_name2obd(const char *name)
426 int dev = class_name2dev(name);
428 if (dev < 0 || dev > class_devno_max())
430 return class_num2obd(dev);
432 EXPORT_SYMBOL(class_name2obd);
434 int class_uuid2dev(struct obd_uuid *uuid)
438 read_lock(&obd_dev_lock);
439 for (i = 0; i < class_devno_max(); i++) {
440 struct obd_device *obd = class_num2obd(i);
442 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
443 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
444 read_unlock(&obd_dev_lock);
448 read_unlock(&obd_dev_lock);
453 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
455 int dev = class_uuid2dev(uuid);
458 return class_num2obd(dev);
460 EXPORT_SYMBOL(class_uuid2obd);
463 * Get obd device from ::obd_devs[]
465 * \param num [in] array index
467 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
468 * otherwise return the obd device there.
470 struct obd_device *class_num2obd(int num)
472 struct obd_device *obd = NULL;
474 if (num < class_devno_max()) {
479 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
480 "%p obd_magic %08x != %08x\n",
481 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
482 LASSERTF(obd->obd_minor == num,
483 "%p obd_minor %0d != %0d\n",
484 obd, obd->obd_minor, num);
491 * Get obd devices count. Device in any
493 * \retval obd device count
495 int get_devices_count(void)
497 int index, max_index = class_devno_max(), dev_count = 0;
499 read_lock(&obd_dev_lock);
500 for (index = 0; index <= max_index; index++) {
501 struct obd_device *obd = class_num2obd(index);
505 read_unlock(&obd_dev_lock);
509 EXPORT_SYMBOL(get_devices_count);
511 void class_obd_list(void)
516 read_lock(&obd_dev_lock);
517 for (i = 0; i < class_devno_max(); i++) {
518 struct obd_device *obd = class_num2obd(i);
522 if (obd->obd_stopping)
524 else if (obd->obd_set_up)
526 else if (obd->obd_attached)
530 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
531 i, status, obd->obd_type->typ_name,
532 obd->obd_name, obd->obd_uuid.uuid,
533 atomic_read(&obd->obd_refcount));
535 read_unlock(&obd_dev_lock);
539 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
540 specified, then only the client with that uuid is returned,
541 otherwise any client connected to the tgt is returned. */
542 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
543 const char * typ_name,
544 struct obd_uuid *grp_uuid)
548 read_lock(&obd_dev_lock);
549 for (i = 0; i < class_devno_max(); i++) {
550 struct obd_device *obd = class_num2obd(i);
554 if ((strncmp(obd->obd_type->typ_name, typ_name,
555 strlen(typ_name)) == 0)) {
556 if (obd_uuid_equals(tgt_uuid,
557 &obd->u.cli.cl_target_uuid) &&
558 ((grp_uuid)? obd_uuid_equals(grp_uuid,
559 &obd->obd_uuid) : 1)) {
560 read_unlock(&obd_dev_lock);
565 read_unlock(&obd_dev_lock);
569 EXPORT_SYMBOL(class_find_client_obd);
571 /* Iterate the obd_device list looking devices have grp_uuid. Start
572 searching at *next, and if a device is found, the next index to look
573 at is saved in *next. If next is NULL, then the first matching device
574 will always be returned. */
575 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
581 else if (*next >= 0 && *next < class_devno_max())
586 read_lock(&obd_dev_lock);
587 for (; i < class_devno_max(); i++) {
588 struct obd_device *obd = class_num2obd(i);
592 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
595 read_unlock(&obd_dev_lock);
599 read_unlock(&obd_dev_lock);
603 EXPORT_SYMBOL(class_devices_in_group);
606 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
607 * adjust sptlrpc settings accordingly.
609 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
611 struct obd_device *obd;
615 LASSERT(namelen > 0);
617 read_lock(&obd_dev_lock);
618 for (i = 0; i < class_devno_max(); i++) {
619 obd = class_num2obd(i);
621 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
624 /* only notify mdc, osc, mdt, ost */
625 type = obd->obd_type->typ_name;
626 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
627 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
628 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
629 strcmp(type, LUSTRE_OST_NAME) != 0)
632 if (strncmp(obd->obd_name, fsname, namelen))
635 class_incref(obd, __FUNCTION__, obd);
636 read_unlock(&obd_dev_lock);
637 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
638 sizeof(KEY_SPTLRPC_CONF),
639 KEY_SPTLRPC_CONF, 0, NULL, NULL);
641 class_decref(obd, __FUNCTION__, obd);
642 read_lock(&obd_dev_lock);
644 read_unlock(&obd_dev_lock);
647 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
649 void obd_cleanup_caches(void)
652 if (obd_device_cachep) {
653 kmem_cache_destroy(obd_device_cachep);
654 obd_device_cachep = NULL;
657 kmem_cache_destroy(obdo_cachep);
661 kmem_cache_destroy(import_cachep);
662 import_cachep = NULL;
668 int obd_init_caches(void)
673 LASSERT(obd_device_cachep == NULL);
674 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
675 sizeof(struct obd_device),
677 if (!obd_device_cachep)
678 GOTO(out, rc = -ENOMEM);
680 LASSERT(obdo_cachep == NULL);
681 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
684 GOTO(out, rc = -ENOMEM);
686 LASSERT(import_cachep == NULL);
687 import_cachep = kmem_cache_create("ll_import_cache",
688 sizeof(struct obd_import),
691 GOTO(out, rc = -ENOMEM);
695 obd_cleanup_caches();
699 /* map connection to client */
700 struct obd_export *class_conn2export(struct lustre_handle *conn)
702 struct obd_export *export;
706 CDEBUG(D_CACHE, "looking for null handle\n");
710 if (conn->cookie == -1) { /* this means assign a new connection */
711 CDEBUG(D_CACHE, "want a new connection\n");
715 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
716 export = class_handle2object(conn->cookie, NULL);
719 EXPORT_SYMBOL(class_conn2export);
721 struct obd_device *class_exp2obd(struct obd_export *exp)
727 EXPORT_SYMBOL(class_exp2obd);
729 struct obd_device *class_conn2obd(struct lustre_handle *conn)
731 struct obd_export *export;
732 export = class_conn2export(conn);
734 struct obd_device *obd = export->exp_obd;
735 class_export_put(export);
741 struct obd_import *class_exp2cliimp(struct obd_export *exp)
743 struct obd_device *obd = exp->exp_obd;
746 return obd->u.cli.cl_import;
748 EXPORT_SYMBOL(class_exp2cliimp);
750 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
752 struct obd_device *obd = class_conn2obd(conn);
755 return obd->u.cli.cl_import;
758 /* Export management functions */
759 static void class_export_destroy(struct obd_export *exp)
761 struct obd_device *obd = exp->exp_obd;
764 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
765 LASSERT(obd != NULL);
767 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
768 exp->exp_client_uuid.uuid, obd->obd_name);
770 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
771 if (exp->exp_connection)
772 ptlrpc_put_connection_superhack(exp->exp_connection);
774 LASSERT(list_empty(&exp->exp_outstanding_replies));
775 LASSERT(list_empty(&exp->exp_uncommitted_replies));
776 LASSERT(list_empty(&exp->exp_req_replay_queue));
777 LASSERT(list_empty(&exp->exp_hp_rpcs));
778 obd_destroy_export(exp);
779 class_decref(obd, "export", exp);
781 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
785 static void export_handle_addref(void *export)
787 class_export_get(export);
790 static struct portals_handle_ops export_handle_ops = {
791 .hop_addref = export_handle_addref,
795 struct obd_export *class_export_get(struct obd_export *exp)
797 atomic_inc(&exp->exp_refcount);
798 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
799 atomic_read(&exp->exp_refcount));
802 EXPORT_SYMBOL(class_export_get);
804 void class_export_put(struct obd_export *exp)
806 LASSERT(exp != NULL);
807 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
808 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
809 atomic_read(&exp->exp_refcount) - 1);
811 if (atomic_dec_and_test(&exp->exp_refcount)) {
812 LASSERT(!list_empty(&exp->exp_obd_chain));
813 CDEBUG(D_IOCTL, "final put %p/%s\n",
814 exp, exp->exp_client_uuid.uuid);
816 /* release nid stat refererence */
817 lprocfs_exp_cleanup(exp);
819 obd_zombie_export_add(exp);
822 EXPORT_SYMBOL(class_export_put);
824 /* Creates a new export, adds it to the hash table, and returns a
825 * pointer to it. The refcount is 2: one for the hash reference, and
826 * one for the pointer returned by this function. */
827 struct obd_export *class_new_export(struct obd_device *obd,
828 struct obd_uuid *cluuid)
830 struct obd_export *export;
831 struct cfs_hash *hash = NULL;
835 OBD_ALLOC_PTR(export);
837 return ERR_PTR(-ENOMEM);
839 export->exp_conn_cnt = 0;
840 export->exp_lock_hash = NULL;
841 export->exp_flock_hash = NULL;
842 atomic_set(&export->exp_refcount, 2);
843 atomic_set(&export->exp_rpc_count, 0);
844 atomic_set(&export->exp_cb_count, 0);
845 atomic_set(&export->exp_locks_count, 0);
846 #if LUSTRE_TRACKS_LOCK_EXP_REFS
847 INIT_LIST_HEAD(&export->exp_locks_list);
848 spin_lock_init(&export->exp_locks_list_guard);
850 atomic_set(&export->exp_replay_count, 0);
851 export->exp_obd = obd;
852 INIT_LIST_HEAD(&export->exp_outstanding_replies);
853 spin_lock_init(&export->exp_uncommitted_replies_lock);
854 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
855 INIT_LIST_HEAD(&export->exp_req_replay_queue);
856 INIT_LIST_HEAD(&export->exp_handle.h_link);
857 INIT_LIST_HEAD(&export->exp_hp_rpcs);
858 INIT_LIST_HEAD(&export->exp_reg_rpcs);
859 class_handle_hash(&export->exp_handle, &export_handle_ops);
860 export->exp_last_request_time = cfs_time_current_sec();
861 spin_lock_init(&export->exp_lock);
862 spin_lock_init(&export->exp_rpc_lock);
863 INIT_HLIST_NODE(&export->exp_uuid_hash);
864 INIT_HLIST_NODE(&export->exp_nid_hash);
865 INIT_HLIST_NODE(&export->exp_gen_hash);
866 spin_lock_init(&export->exp_bl_list_lock);
867 INIT_LIST_HEAD(&export->exp_bl_list);
869 export->exp_sp_peer = LUSTRE_SP_ANY;
870 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
871 export->exp_client_uuid = *cluuid;
872 obd_init_export(export);
874 spin_lock(&obd->obd_dev_lock);
875 /* shouldn't happen, but might race */
876 if (obd->obd_stopping)
877 GOTO(exit_unlock, rc = -ENODEV);
879 hash = cfs_hash_getref(obd->obd_uuid_hash);
881 GOTO(exit_unlock, rc = -ENODEV);
882 spin_unlock(&obd->obd_dev_lock);
884 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
885 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
887 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
888 obd->obd_name, cluuid->uuid, rc);
889 GOTO(exit_err, rc = -EALREADY);
893 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
894 spin_lock(&obd->obd_dev_lock);
895 if (obd->obd_stopping) {
896 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
897 GOTO(exit_unlock, rc = -ENODEV);
900 class_incref(obd, "export", export);
901 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
902 list_add_tail(&export->exp_obd_chain_timed,
903 &export->exp_obd->obd_exports_timed);
904 export->exp_obd->obd_num_exports++;
905 spin_unlock(&obd->obd_dev_lock);
906 cfs_hash_putref(hash);
910 spin_unlock(&obd->obd_dev_lock);
913 cfs_hash_putref(hash);
914 class_handle_unhash(&export->exp_handle);
915 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
916 obd_destroy_export(export);
917 OBD_FREE_PTR(export);
920 EXPORT_SYMBOL(class_new_export);
922 void class_unlink_export(struct obd_export *exp)
924 class_handle_unhash(&exp->exp_handle);
926 spin_lock(&exp->exp_obd->obd_dev_lock);
927 /* delete an uuid-export hashitem from hashtables */
928 if (!hlist_unhashed(&exp->exp_uuid_hash))
929 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
930 &exp->exp_client_uuid,
931 &exp->exp_uuid_hash);
933 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
934 list_del_init(&exp->exp_obd_chain_timed);
935 exp->exp_obd->obd_num_exports--;
936 spin_unlock(&exp->exp_obd->obd_dev_lock);
937 class_export_put(exp);
940 /* Import management functions */
941 static void class_import_destroy(struct obd_import *imp)
945 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
946 imp->imp_obd->obd_name);
948 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
950 ptlrpc_put_connection_superhack(imp->imp_connection);
952 while (!list_empty(&imp->imp_conn_list)) {
953 struct obd_import_conn *imp_conn;
955 imp_conn = list_entry(imp->imp_conn_list.next,
956 struct obd_import_conn, oic_item);
957 list_del_init(&imp_conn->oic_item);
958 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
959 OBD_FREE(imp_conn, sizeof(*imp_conn));
962 LASSERT(imp->imp_sec == NULL);
963 class_decref(imp->imp_obd, "import", imp);
964 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
968 static void import_handle_addref(void *import)
970 class_import_get(import);
973 static struct portals_handle_ops import_handle_ops = {
974 .hop_addref = import_handle_addref,
978 struct obd_import *class_import_get(struct obd_import *import)
980 atomic_inc(&import->imp_refcount);
981 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
982 atomic_read(&import->imp_refcount),
983 import->imp_obd->obd_name);
986 EXPORT_SYMBOL(class_import_get);
988 void class_import_put(struct obd_import *imp)
992 LASSERT(list_empty(&imp->imp_zombie_chain));
993 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
995 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
996 atomic_read(&imp->imp_refcount) - 1,
997 imp->imp_obd->obd_name);
999 if (atomic_dec_and_test(&imp->imp_refcount)) {
1000 CDEBUG(D_INFO, "final put import %p\n", imp);
1001 obd_zombie_import_add(imp);
1004 /* catch possible import put race */
1005 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1008 EXPORT_SYMBOL(class_import_put);
1010 static void init_imp_at(struct imp_at *at) {
1012 at_init(&at->iat_net_latency, 0, 0);
1013 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1014 /* max service estimates are tracked on the server side, so
1015 don't use the AT history here, just use the last reported
1016 val. (But keep hist for proc histogram, worst_ever) */
1017 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1022 struct obd_import *class_new_import(struct obd_device *obd)
1024 struct obd_import *imp;
1026 OBD_ALLOC(imp, sizeof(*imp));
1030 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1031 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1032 INIT_LIST_HEAD(&imp->imp_replay_list);
1033 INIT_LIST_HEAD(&imp->imp_sending_list);
1034 INIT_LIST_HEAD(&imp->imp_delayed_list);
1035 INIT_LIST_HEAD(&imp->imp_committed_list);
1036 imp->imp_replay_cursor = &imp->imp_committed_list;
1037 spin_lock_init(&imp->imp_lock);
1038 imp->imp_last_success_conn = 0;
1039 imp->imp_state = LUSTRE_IMP_NEW;
1040 imp->imp_obd = class_incref(obd, "import", imp);
1041 mutex_init(&imp->imp_sec_mutex);
1042 init_waitqueue_head(&imp->imp_recovery_waitq);
1044 atomic_set(&imp->imp_refcount, 2);
1045 atomic_set(&imp->imp_unregistering, 0);
1046 atomic_set(&imp->imp_inflight, 0);
1047 atomic_set(&imp->imp_replay_inflight, 0);
1048 atomic_set(&imp->imp_inval_count, 0);
1049 INIT_LIST_HEAD(&imp->imp_conn_list);
1050 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1051 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1052 init_imp_at(&imp->imp_at);
1054 /* the default magic is V2, will be used in connect RPC, and
1055 * then adjusted according to the flags in request/reply. */
1056 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1060 EXPORT_SYMBOL(class_new_import);
1062 void class_destroy_import(struct obd_import *import)
1064 LASSERT(import != NULL);
1065 LASSERT(import != LP_POISON);
1067 class_handle_unhash(&import->imp_handle);
1069 spin_lock(&import->imp_lock);
1070 import->imp_generation++;
1071 spin_unlock(&import->imp_lock);
1072 class_import_put(import);
1074 EXPORT_SYMBOL(class_destroy_import);
1076 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1078 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1080 spin_lock(&exp->exp_locks_list_guard);
1082 LASSERT(lock->l_exp_refs_nr >= 0);
1084 if (lock->l_exp_refs_target != NULL &&
1085 lock->l_exp_refs_target != exp) {
1086 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1087 exp, lock, lock->l_exp_refs_target);
1089 if ((lock->l_exp_refs_nr ++) == 0) {
1090 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1091 lock->l_exp_refs_target = exp;
1093 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1094 lock, exp, lock->l_exp_refs_nr);
1095 spin_unlock(&exp->exp_locks_list_guard);
1098 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1100 spin_lock(&exp->exp_locks_list_guard);
1101 LASSERT(lock->l_exp_refs_nr > 0);
1102 if (lock->l_exp_refs_target != exp) {
1103 LCONSOLE_WARN("lock %p, "
1104 "mismatching export pointers: %p, %p\n",
1105 lock, lock->l_exp_refs_target, exp);
1107 if (-- lock->l_exp_refs_nr == 0) {
1108 list_del_init(&lock->l_exp_refs_link);
1109 lock->l_exp_refs_target = NULL;
1111 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1112 lock, exp, lock->l_exp_refs_nr);
1113 spin_unlock(&exp->exp_locks_list_guard);
1117 /* A connection defines an export context in which preallocation can
1118 be managed. This releases the export pointer reference, and returns
1119 the export handle, so the export refcount is 1 when this function
1121 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1122 struct obd_uuid *cluuid)
1124 struct obd_export *export;
1125 LASSERT(conn != NULL);
1126 LASSERT(obd != NULL);
1127 LASSERT(cluuid != NULL);
1130 export = class_new_export(obd, cluuid);
1132 RETURN(PTR_ERR(export));
1134 conn->cookie = export->exp_handle.h_cookie;
1135 class_export_put(export);
1137 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1138 cluuid->uuid, conn->cookie);
1141 EXPORT_SYMBOL(class_connect);
1143 /* if export is involved in recovery then clean up related things */
1144 static void class_export_recovery_cleanup(struct obd_export *exp)
1146 struct obd_device *obd = exp->exp_obd;
1148 spin_lock(&obd->obd_recovery_task_lock);
1149 if (obd->obd_recovering) {
1150 if (exp->exp_in_recovery) {
1151 spin_lock(&exp->exp_lock);
1152 exp->exp_in_recovery = 0;
1153 spin_unlock(&exp->exp_lock);
1154 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1155 atomic_dec(&obd->obd_connected_clients);
1158 /* if called during recovery then should update
1159 * obd_stale_clients counter,
1160 * lightweight exports are not counted */
1161 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1162 exp->exp_obd->obd_stale_clients++;
1164 spin_unlock(&obd->obd_recovery_task_lock);
1166 spin_lock(&exp->exp_lock);
1167 /** Cleanup req replay fields */
1168 if (exp->exp_req_replay_needed) {
1169 exp->exp_req_replay_needed = 0;
1171 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1172 atomic_dec(&obd->obd_req_replay_clients);
1175 /** Cleanup lock replay data */
1176 if (exp->exp_lock_replay_needed) {
1177 exp->exp_lock_replay_needed = 0;
1179 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1180 atomic_dec(&obd->obd_lock_replay_clients);
1182 spin_unlock(&exp->exp_lock);
1185 /* This function removes 1-3 references from the export:
1186 * 1 - for export pointer passed
1187 * and if disconnect really need
1188 * 2 - removing from hash
1189 * 3 - in client_unlink_export
1190 * The export pointer passed to this function can destroyed */
1191 int class_disconnect(struct obd_export *export)
1193 int already_disconnected;
1196 if (export == NULL) {
1197 CWARN("attempting to free NULL export %p\n", export);
1201 spin_lock(&export->exp_lock);
1202 already_disconnected = export->exp_disconnected;
1203 export->exp_disconnected = 1;
1204 spin_unlock(&export->exp_lock);
1206 /* class_cleanup(), abort_recovery(), and class_fail_export()
1207 * all end up in here, and if any of them race we shouldn't
1208 * call extra class_export_puts(). */
1209 if (already_disconnected) {
1210 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1211 GOTO(no_disconn, already_disconnected);
1214 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1215 export->exp_handle.h_cookie);
1217 if (!hlist_unhashed(&export->exp_nid_hash))
1218 cfs_hash_del(export->exp_obd->obd_nid_hash,
1219 &export->exp_connection->c_peer.nid,
1220 &export->exp_nid_hash);
1222 class_export_recovery_cleanup(export);
1223 class_unlink_export(export);
1225 class_export_put(export);
1228 EXPORT_SYMBOL(class_disconnect);
1230 /* Return non-zero for a fully connected export */
1231 int class_connected_export(struct obd_export *exp)
1236 spin_lock(&exp->exp_lock);
1237 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1238 spin_unlock(&exp->exp_lock);
1242 EXPORT_SYMBOL(class_connected_export);
1244 static void class_disconnect_export_list(struct list_head *list,
1245 enum obd_option flags)
1248 struct obd_export *exp;
1251 /* It's possible that an export may disconnect itself, but
1252 * nothing else will be added to this list. */
1253 while (!list_empty(list)) {
1254 exp = list_entry(list->next, struct obd_export,
1256 /* need for safe call CDEBUG after obd_disconnect */
1257 class_export_get(exp);
1259 spin_lock(&exp->exp_lock);
1260 exp->exp_flags = flags;
1261 spin_unlock(&exp->exp_lock);
1263 if (obd_uuid_equals(&exp->exp_client_uuid,
1264 &exp->exp_obd->obd_uuid)) {
1266 "exp %p export uuid == obd uuid, don't discon\n",
1268 /* Need to delete this now so we don't end up pointing
1269 * to work_list later when this export is cleaned up. */
1270 list_del_init(&exp->exp_obd_chain);
1271 class_export_put(exp);
1275 class_export_get(exp);
1276 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1277 "last request at "CFS_TIME_T"\n",
1278 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1279 exp, exp->exp_last_request_time);
1280 /* release one export reference anyway */
1281 rc = obd_disconnect(exp);
1283 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1284 obd_export_nid2str(exp), exp, rc);
1285 class_export_put(exp);
1290 void class_disconnect_exports(struct obd_device *obd)
1292 struct list_head work_list;
1295 /* Move all of the exports from obd_exports to a work list, en masse. */
1296 INIT_LIST_HEAD(&work_list);
1297 spin_lock(&obd->obd_dev_lock);
1298 list_splice_init(&obd->obd_exports, &work_list);
1299 list_splice_init(&obd->obd_delayed_exports, &work_list);
1300 spin_unlock(&obd->obd_dev_lock);
1302 if (!list_empty(&work_list)) {
1303 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1304 "disconnecting them\n", obd->obd_minor, obd);
1305 class_disconnect_export_list(&work_list,
1306 exp_flags_from_obd(obd));
1308 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1309 obd->obd_minor, obd);
1312 EXPORT_SYMBOL(class_disconnect_exports);
1314 /* Remove exports that have not completed recovery.
1316 void class_disconnect_stale_exports(struct obd_device *obd,
1317 int (*test_export)(struct obd_export *))
1319 struct list_head work_list;
1320 struct obd_export *exp, *n;
1324 INIT_LIST_HEAD(&work_list);
1325 spin_lock(&obd->obd_dev_lock);
1326 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1328 /* don't count self-export as client */
1329 if (obd_uuid_equals(&exp->exp_client_uuid,
1330 &exp->exp_obd->obd_uuid))
1333 /* don't evict clients which have no slot in last_rcvd
1334 * (e.g. lightweight connection) */
1335 if (exp->exp_target_data.ted_lr_idx == -1)
1338 spin_lock(&exp->exp_lock);
1339 if (exp->exp_failed || test_export(exp)) {
1340 spin_unlock(&exp->exp_lock);
1343 exp->exp_failed = 1;
1344 spin_unlock(&exp->exp_lock);
1346 list_move(&exp->exp_obd_chain, &work_list);
1348 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1349 obd->obd_name, exp->exp_client_uuid.uuid,
1350 exp->exp_connection == NULL ? "<unknown>" :
1351 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1352 print_export_data(exp, "EVICTING", 0);
1354 spin_unlock(&obd->obd_dev_lock);
1357 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1358 obd->obd_name, evicted);
1360 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1361 OBD_OPT_ABORT_RECOV);
1364 EXPORT_SYMBOL(class_disconnect_stale_exports);
1366 void class_fail_export(struct obd_export *exp)
1368 int rc, already_failed;
1370 spin_lock(&exp->exp_lock);
1371 already_failed = exp->exp_failed;
1372 exp->exp_failed = 1;
1373 spin_unlock(&exp->exp_lock);
1375 if (already_failed) {
1376 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1377 exp, exp->exp_client_uuid.uuid);
1381 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1382 exp, exp->exp_client_uuid.uuid);
1384 if (obd_dump_on_timeout)
1385 libcfs_debug_dumplog();
1387 /* need for safe call CDEBUG after obd_disconnect */
1388 class_export_get(exp);
1390 /* Most callers into obd_disconnect are removing their own reference
1391 * (request, for example) in addition to the one from the hash table.
1392 * We don't have such a reference here, so make one. */
1393 class_export_get(exp);
1394 rc = obd_disconnect(exp);
1396 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1398 CDEBUG(D_HA, "disconnected export %p/%s\n",
1399 exp, exp->exp_client_uuid.uuid);
1400 class_export_put(exp);
1402 EXPORT_SYMBOL(class_fail_export);
1404 char *obd_export_nid2str(struct obd_export *exp)
1406 if (exp->exp_connection != NULL)
1407 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1411 EXPORT_SYMBOL(obd_export_nid2str);
1413 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1415 struct cfs_hash *nid_hash;
1416 struct obd_export *doomed_exp = NULL;
1417 int exports_evicted = 0;
1419 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1421 spin_lock(&obd->obd_dev_lock);
1422 /* umount has run already, so evict thread should leave
1423 * its task to umount thread now */
1424 if (obd->obd_stopping) {
1425 spin_unlock(&obd->obd_dev_lock);
1426 return exports_evicted;
1428 nid_hash = obd->obd_nid_hash;
1429 cfs_hash_getref(nid_hash);
1430 spin_unlock(&obd->obd_dev_lock);
1433 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1434 if (doomed_exp == NULL)
1437 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1438 "nid %s found, wanted nid %s, requested nid %s\n",
1439 obd_export_nid2str(doomed_exp),
1440 libcfs_nid2str(nid_key), nid);
1441 LASSERTF(doomed_exp != obd->obd_self_export,
1442 "self-export is hashed by NID?\n");
1444 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1445 "request\n", obd->obd_name,
1446 obd_uuid2str(&doomed_exp->exp_client_uuid),
1447 obd_export_nid2str(doomed_exp));
1448 class_fail_export(doomed_exp);
1449 class_export_put(doomed_exp);
1452 cfs_hash_putref(nid_hash);
1454 if (!exports_evicted)
1455 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1456 obd->obd_name, nid);
1457 return exports_evicted;
1459 EXPORT_SYMBOL(obd_export_evict_by_nid);
1461 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1463 struct cfs_hash *uuid_hash;
1464 struct obd_export *doomed_exp = NULL;
1465 struct obd_uuid doomed_uuid;
1466 int exports_evicted = 0;
1468 spin_lock(&obd->obd_dev_lock);
1469 if (obd->obd_stopping) {
1470 spin_unlock(&obd->obd_dev_lock);
1471 return exports_evicted;
1473 uuid_hash = obd->obd_uuid_hash;
1474 cfs_hash_getref(uuid_hash);
1475 spin_unlock(&obd->obd_dev_lock);
1477 obd_str2uuid(&doomed_uuid, uuid);
1478 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1479 CERROR("%s: can't evict myself\n", obd->obd_name);
1480 cfs_hash_putref(uuid_hash);
1481 return exports_evicted;
1484 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1486 if (doomed_exp == NULL) {
1487 CERROR("%s: can't disconnect %s: no exports found\n",
1488 obd->obd_name, uuid);
1490 CWARN("%s: evicting %s at adminstrative request\n",
1491 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1492 class_fail_export(doomed_exp);
1493 class_export_put(doomed_exp);
1496 cfs_hash_putref(uuid_hash);
1498 return exports_evicted;
1501 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1502 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1505 static void print_export_data(struct obd_export *exp, const char *status,
1508 struct ptlrpc_reply_state *rs;
1509 struct ptlrpc_reply_state *first_reply = NULL;
1512 spin_lock(&exp->exp_lock);
1513 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1519 spin_unlock(&exp->exp_lock);
1521 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1522 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1523 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1524 atomic_read(&exp->exp_rpc_count),
1525 atomic_read(&exp->exp_cb_count),
1526 atomic_read(&exp->exp_locks_count),
1527 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1528 nreplies, first_reply, nreplies > 3 ? "..." : "",
1529 exp->exp_last_committed);
1530 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1531 if (locks && class_export_dump_hook != NULL)
1532 class_export_dump_hook(exp);
1536 void dump_exports(struct obd_device *obd, int locks)
1538 struct obd_export *exp;
1540 spin_lock(&obd->obd_dev_lock);
1541 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1542 print_export_data(exp, "ACTIVE", locks);
1543 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1544 print_export_data(exp, "UNLINKED", locks);
1545 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1546 print_export_data(exp, "DELAYED", locks);
1547 spin_unlock(&obd->obd_dev_lock);
1548 spin_lock(&obd_zombie_impexp_lock);
1549 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1550 print_export_data(exp, "ZOMBIE", locks);
1551 spin_unlock(&obd_zombie_impexp_lock);
1554 void obd_exports_barrier(struct obd_device *obd)
1557 LASSERT(list_empty(&obd->obd_exports));
1558 spin_lock(&obd->obd_dev_lock);
1559 while (!list_empty(&obd->obd_unlinked_exports)) {
1560 spin_unlock(&obd->obd_dev_lock);
1561 set_current_state(TASK_UNINTERRUPTIBLE);
1562 schedule_timeout(cfs_time_seconds(waited));
1563 if (waited > 5 && IS_PO2(waited)) {
1564 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1565 "more than %d seconds. "
1566 "The obd refcount = %d. Is it stuck?\n",
1567 obd->obd_name, waited,
1568 atomic_read(&obd->obd_refcount));
1569 dump_exports(obd, 1);
1572 spin_lock(&obd->obd_dev_lock);
1574 spin_unlock(&obd->obd_dev_lock);
1576 EXPORT_SYMBOL(obd_exports_barrier);
1578 /* Total amount of zombies to be destroyed */
1579 static int zombies_count = 0;
1582 * kill zombie imports and exports
1584 void obd_zombie_impexp_cull(void)
1586 struct obd_import *import;
1587 struct obd_export *export;
1591 spin_lock(&obd_zombie_impexp_lock);
1594 if (!list_empty(&obd_zombie_imports)) {
1595 import = list_entry(obd_zombie_imports.next,
1598 list_del_init(&import->imp_zombie_chain);
1602 if (!list_empty(&obd_zombie_exports)) {
1603 export = list_entry(obd_zombie_exports.next,
1606 list_del_init(&export->exp_obd_chain);
1609 spin_unlock(&obd_zombie_impexp_lock);
1611 if (import != NULL) {
1612 class_import_destroy(import);
1613 spin_lock(&obd_zombie_impexp_lock);
1615 spin_unlock(&obd_zombie_impexp_lock);
1618 if (export != NULL) {
1619 class_export_destroy(export);
1620 spin_lock(&obd_zombie_impexp_lock);
1622 spin_unlock(&obd_zombie_impexp_lock);
1626 } while (import != NULL || export != NULL);
1630 static struct completion obd_zombie_start;
1631 static struct completion obd_zombie_stop;
1632 static unsigned long obd_zombie_flags;
1633 static wait_queue_head_t obd_zombie_waitq;
1634 static pid_t obd_zombie_pid;
1637 OBD_ZOMBIE_STOP = 0x0001,
1641 * check for work for kill zombie import/export thread.
1643 static int obd_zombie_impexp_check(void *arg)
1647 spin_lock(&obd_zombie_impexp_lock);
1648 rc = (zombies_count == 0) &&
1649 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1650 spin_unlock(&obd_zombie_impexp_lock);
1656 * Add export to the obd_zombe thread and notify it.
1658 static void obd_zombie_export_add(struct obd_export *exp) {
1659 spin_lock(&exp->exp_obd->obd_dev_lock);
1660 LASSERT(!list_empty(&exp->exp_obd_chain));
1661 list_del_init(&exp->exp_obd_chain);
1662 spin_unlock(&exp->exp_obd->obd_dev_lock);
1663 spin_lock(&obd_zombie_impexp_lock);
1665 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1666 spin_unlock(&obd_zombie_impexp_lock);
1668 obd_zombie_impexp_notify();
1672 * Add import to the obd_zombe thread and notify it.
1674 static void obd_zombie_import_add(struct obd_import *imp) {
1675 LASSERT(imp->imp_sec == NULL);
1676 LASSERT(imp->imp_rq_pool == NULL);
1677 spin_lock(&obd_zombie_impexp_lock);
1678 LASSERT(list_empty(&imp->imp_zombie_chain));
1680 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1681 spin_unlock(&obd_zombie_impexp_lock);
1683 obd_zombie_impexp_notify();
1687 * notify import/export destroy thread about new zombie.
1689 static void obd_zombie_impexp_notify(void)
1692 * Make sure obd_zomebie_impexp_thread get this notification.
1693 * It is possible this signal only get by obd_zombie_barrier, and
1694 * barrier gulps this notification and sleeps away and hangs ensues
1696 wake_up_all(&obd_zombie_waitq);
1700 * check whether obd_zombie is idle
1702 static int obd_zombie_is_idle(void)
1706 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1707 spin_lock(&obd_zombie_impexp_lock);
1708 rc = (zombies_count == 0);
1709 spin_unlock(&obd_zombie_impexp_lock);
1714 * wait when obd_zombie import/export queues become empty
1716 void obd_zombie_barrier(void)
1718 struct l_wait_info lwi = { 0 };
1720 if (obd_zombie_pid == current_pid())
1721 /* don't wait for myself */
1723 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1725 EXPORT_SYMBOL(obd_zombie_barrier);
1729 * destroy zombie export/import thread.
1731 static int obd_zombie_impexp_thread(void *unused)
1733 unshare_fs_struct();
1734 complete(&obd_zombie_start);
1736 obd_zombie_pid = current_pid();
1738 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1739 struct l_wait_info lwi = { 0 };
1741 l_wait_event(obd_zombie_waitq,
1742 !obd_zombie_impexp_check(NULL), &lwi);
1743 obd_zombie_impexp_cull();
1746 * Notify obd_zombie_barrier callers that queues
1749 wake_up(&obd_zombie_waitq);
1752 complete(&obd_zombie_stop);
1759 * start destroy zombie import/export thread
1761 int obd_zombie_impexp_init(void)
1763 struct task_struct *task;
1765 INIT_LIST_HEAD(&obd_zombie_imports);
1767 INIT_LIST_HEAD(&obd_zombie_exports);
1768 spin_lock_init(&obd_zombie_impexp_lock);
1769 init_completion(&obd_zombie_start);
1770 init_completion(&obd_zombie_stop);
1771 init_waitqueue_head(&obd_zombie_waitq);
1774 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1776 RETURN(PTR_ERR(task));
1778 wait_for_completion(&obd_zombie_start);
1782 * stop destroy zombie import/export thread
1784 void obd_zombie_impexp_stop(void)
1786 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1787 obd_zombie_impexp_notify();
1788 wait_for_completion(&obd_zombie_stop);
1791 /***** Kernel-userspace comm helpers *******/
1793 /* Get length of entire message, including header */
1794 int kuc_len(int payload_len)
1796 return sizeof(struct kuc_hdr) + payload_len;
1798 EXPORT_SYMBOL(kuc_len);
1800 /* Get a pointer to kuc header, given a ptr to the payload
1801 * @param p Pointer to payload area
1802 * @returns Pointer to kuc header
1804 struct kuc_hdr * kuc_ptr(void *p)
1806 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1807 LASSERT(lh->kuc_magic == KUC_MAGIC);
1810 EXPORT_SYMBOL(kuc_ptr);
1812 /* Test if payload is part of kuc message
1813 * @param p Pointer to payload area
1816 int kuc_ispayload(void *p)
1818 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1820 if (kh->kuc_magic == KUC_MAGIC)
1825 EXPORT_SYMBOL(kuc_ispayload);
1827 /* Alloc space for a message, and fill in header
1828 * @return Pointer to payload area
1830 void *kuc_alloc(int payload_len, int transport, int type)
1833 int len = kuc_len(payload_len);
1837 return ERR_PTR(-ENOMEM);
1839 lh->kuc_magic = KUC_MAGIC;
1840 lh->kuc_transport = transport;
1841 lh->kuc_msgtype = type;
1842 lh->kuc_msglen = len;
1844 return (void *)(lh + 1);
1846 EXPORT_SYMBOL(kuc_alloc);
1848 /* Takes pointer to payload area */
1849 inline void kuc_free(void *p, int payload_len)
1851 struct kuc_hdr *lh = kuc_ptr(p);
1852 OBD_FREE(lh, kuc_len(payload_len));
1854 EXPORT_SYMBOL(kuc_free);
1856 struct obd_request_slot_waiter {
1857 struct list_head orsw_entry;
1858 wait_queue_head_t orsw_waitq;
1862 static bool obd_request_slot_avail(struct client_obd *cli,
1863 struct obd_request_slot_waiter *orsw)
1867 spin_lock(&cli->cl_loi_list_lock);
1868 avail = !!list_empty(&orsw->orsw_entry);
1869 spin_unlock(&cli->cl_loi_list_lock);
1875 * For network flow control, the RPC sponsor needs to acquire a credit
1876 * before sending the RPC. The credits count for a connection is defined
1877 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1878 * the subsequent RPC sponsors need to wait until others released their
1879 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1881 int obd_get_request_slot(struct client_obd *cli)
1883 struct obd_request_slot_waiter orsw;
1884 struct l_wait_info lwi;
1887 spin_lock(&cli->cl_loi_list_lock);
1888 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1889 cli->cl_r_in_flight++;
1890 spin_unlock(&cli->cl_loi_list_lock);
1894 init_waitqueue_head(&orsw.orsw_waitq);
1895 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1896 orsw.orsw_signaled = false;
1897 spin_unlock(&cli->cl_loi_list_lock);
1899 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1900 rc = l_wait_event(orsw.orsw_waitq,
1901 obd_request_slot_avail(cli, &orsw) ||
1905 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1906 * freed but other (such as obd_put_request_slot) is using it. */
1907 spin_lock(&cli->cl_loi_list_lock);
1909 if (!orsw.orsw_signaled) {
1910 if (list_empty(&orsw.orsw_entry))
1911 cli->cl_r_in_flight--;
1913 list_del(&orsw.orsw_entry);
1917 if (orsw.orsw_signaled) {
1918 LASSERT(list_empty(&orsw.orsw_entry));
1922 spin_unlock(&cli->cl_loi_list_lock);
1926 EXPORT_SYMBOL(obd_get_request_slot);
1928 void obd_put_request_slot(struct client_obd *cli)
1930 struct obd_request_slot_waiter *orsw;
1932 spin_lock(&cli->cl_loi_list_lock);
1933 cli->cl_r_in_flight--;
1935 /* If there is free slot, wakeup the first waiter. */
1936 if (!list_empty(&cli->cl_loi_read_list) &&
1937 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1938 orsw = list_entry(cli->cl_loi_read_list.next,
1939 struct obd_request_slot_waiter, orsw_entry);
1940 list_del_init(&orsw->orsw_entry);
1941 cli->cl_r_in_flight++;
1942 wake_up(&orsw->orsw_waitq);
1944 spin_unlock(&cli->cl_loi_list_lock);
1946 EXPORT_SYMBOL(obd_put_request_slot);
1948 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1950 return cli->cl_max_rpcs_in_flight;
1952 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1954 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1956 struct obd_request_slot_waiter *orsw;
1963 if (max > OBD_MAX_RIF_MAX || max < 1)
1966 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
1967 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
1968 /* adjust max_mod_rpcs_in_flight to ensure it is always
1969 * strictly lower that max_rpcs_in_flight */
1971 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
1972 "because it must be higher than "
1973 "max_mod_rpcs_in_flight value",
1974 cli->cl_import->imp_obd->obd_name);
1977 if (max <= cli->cl_max_mod_rpcs_in_flight) {
1978 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
1984 spin_lock(&cli->cl_loi_list_lock);
1985 old = cli->cl_max_rpcs_in_flight;
1986 cli->cl_max_rpcs_in_flight = max;
1989 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1990 for (i = 0; i < diff; i++) {
1991 if (list_empty(&cli->cl_loi_read_list))
1994 orsw = list_entry(cli->cl_loi_read_list.next,
1995 struct obd_request_slot_waiter, orsw_entry);
1996 list_del_init(&orsw->orsw_entry);
1997 cli->cl_r_in_flight++;
1998 wake_up(&orsw->orsw_waitq);
2000 spin_unlock(&cli->cl_loi_list_lock);
2004 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2006 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2008 return cli->cl_max_mod_rpcs_in_flight;
2010 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2012 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2014 struct obd_connect_data *ocd;
2018 if (max > OBD_MAX_RIF_MAX || max < 1)
2021 /* cannot exceed or equal max_rpcs_in_flight */
2022 if (max >= cli->cl_max_rpcs_in_flight) {
2023 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2024 "higher or equal to max_rpcs_in_flight value (%u)\n",
2025 cli->cl_import->imp_obd->obd_name,
2026 max, cli->cl_max_rpcs_in_flight);
2030 /* cannot exceed max modify RPCs in flight supported by the server */
2031 ocd = &cli->cl_import->imp_connect_data;
2032 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2033 maxmodrpcs = ocd->ocd_maxmodrpcs;
2036 if (max > maxmodrpcs) {
2037 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2038 "higher than max_mod_rpcs_per_client value (%hu) "
2039 "returned by the server at connection\n",
2040 cli->cl_import->imp_obd->obd_name,
2045 spin_lock(&cli->cl_mod_rpcs_lock);
2047 prev = cli->cl_max_mod_rpcs_in_flight;
2048 cli->cl_max_mod_rpcs_in_flight = max;
2050 /* wakeup waiters if limit has been increased */
2051 if (cli->cl_max_mod_rpcs_in_flight > prev)
2052 wake_up(&cli->cl_mod_rpcs_waitq);
2054 spin_unlock(&cli->cl_mod_rpcs_lock);
2058 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2061 #define pct(a, b) (b ? a * 100 / b : 0)
2062 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2063 struct seq_file *seq)
2066 unsigned long mod_tot = 0, mod_cum;
2069 do_gettimeofday(&now);
2071 spin_lock(&cli->cl_mod_rpcs_lock);
2073 seq_printf(seq, "snapshot_time: %lu.%lu (secs.usecs)\n",
2074 now.tv_sec, now.tv_usec);
2075 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2076 cli->cl_mod_rpcs_in_flight);
2078 seq_printf(seq, "\n\t\t\tmodify\n");
2079 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2081 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2084 for (i = 0; i < OBD_HIST_MAX; i++) {
2085 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2087 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2088 i, mod, pct(mod, mod_tot),
2089 pct(mod_cum, mod_tot));
2090 if (mod_cum == mod_tot)
2094 spin_unlock(&cli->cl_mod_rpcs_lock);
2098 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2102 /* The number of modify RPCs sent in parallel is limited
2103 * because the server has a finite number of slots per client to
2104 * store request result and ensure reply reconstruction when needed.
2105 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2106 * that takes into account server limit and cl_max_rpcs_in_flight
2108 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2109 * one close request is allowed above the maximum.
2111 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2116 /* A slot is available if
2117 * - number of modify RPCs in flight is less than the max
2118 * - it's a close RPC and no other close request is in flight
2120 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2121 (close_req && cli->cl_close_rpcs_in_flight == 0);
2126 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2131 spin_lock(&cli->cl_mod_rpcs_lock);
2132 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2133 spin_unlock(&cli->cl_mod_rpcs_lock);
2137 /* Get a modify RPC slot from the obd client @cli according
2138 * to the kind of operation @opc that is going to be sent
2139 * and the intent @it of the operation if it applies.
2140 * If the maximum number of modify RPCs in flight is reached
2141 * the thread is put to sleep.
2142 * Returns the tag to be set in the request message. Tag 0
2143 * is reserved for non-modifying requests.
2145 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2146 struct lookup_intent *it)
2148 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2149 bool close_req = false;
2152 /* read-only metadata RPCs don't consume a slot on MDT
2153 * for reply reconstruction
2155 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2156 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2159 if (opc == MDS_CLOSE)
2163 spin_lock(&cli->cl_mod_rpcs_lock);
2164 max = cli->cl_max_mod_rpcs_in_flight;
2165 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2166 /* there is a slot available */
2167 cli->cl_mod_rpcs_in_flight++;
2169 cli->cl_close_rpcs_in_flight++;
2170 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2171 cli->cl_mod_rpcs_in_flight);
2172 /* find a free tag */
2173 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2175 LASSERT(i < OBD_MAX_RIF_MAX);
2176 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2177 spin_unlock(&cli->cl_mod_rpcs_lock);
2178 /* tag 0 is reserved for non-modify RPCs */
2181 spin_unlock(&cli->cl_mod_rpcs_lock);
2183 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2184 "opc %u, max %hu\n",
2185 cli->cl_import->imp_obd->obd_name, opc, max);
2187 l_wait_event(cli->cl_mod_rpcs_waitq,
2188 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2191 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2193 /* Put a modify RPC slot from the obd client @cli according
2194 * to the kind of operation @opc that has been sent and the
2195 * intent @it of the operation if it applies.
2197 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2198 struct lookup_intent *it, __u16 tag)
2200 bool close_req = false;
2202 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2203 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2206 if (opc == MDS_CLOSE)
2209 spin_lock(&cli->cl_mod_rpcs_lock);
2210 cli->cl_mod_rpcs_in_flight--;
2212 cli->cl_close_rpcs_in_flight--;
2213 /* release the tag in the bitmap */
2214 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2215 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2216 spin_unlock(&cli->cl_mod_rpcs_lock);
2217 wake_up(&cli->cl_mod_rpcs_waitq);
2219 EXPORT_SYMBOL(obd_put_mod_rpc_slot);