1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50 #include <lustre_export.h>
52 extern struct list_head obd_types;
53 spinlock_t obd_types_lock;
55 cfs_mem_cache_t *obd_device_cachep;
56 cfs_mem_cache_t *obdo_cachep;
57 EXPORT_SYMBOL(obdo_cachep);
58 cfs_mem_cache_t *import_cachep;
60 struct list_head obd_zombie_imports;
61 struct list_head obd_zombie_exports;
62 spinlock_t obd_zombie_impexp_lock;
63 static void obd_zombie_impexp_notify(void);
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 * support functions: we could use inter-module communication, but this
69 * is more portable to other OS's
71 static struct obd_device *obd_device_alloc(void)
73 struct obd_device *obd;
75 OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
77 obd->obd_magic = OBD_DEVICE_MAGIC;
81 EXPORT_SYMBOL(obd_device_alloc);
83 static void obd_device_free(struct obd_device *obd)
86 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic "
87 "%08x != %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88 if (obd->obd_namespace != NULL) {
89 CERROR("obd %p: namespace %p was not properly cleaned up "
91 obd, obd->obd_namespace, obd->obd_force);
94 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 EXPORT_SYMBOL(obd_device_free);
98 struct obd_type *class_search_type(const char *name)
100 struct list_head *tmp;
101 struct obd_type *type;
103 spin_lock(&obd_types_lock);
104 list_for_each(tmp, &obd_types) {
105 type = list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 spin_unlock(&obd_types_lock);
111 spin_unlock(&obd_types_lock);
115 struct obd_type *class_get_type(const char *name)
117 struct obd_type *type = class_search_type(name);
121 const char *modname = name;
122 if (strcmp(modname, LUSTRE_MDT_NAME) == 0)
123 modname = LUSTRE_MDS_NAME;
124 if (!request_module("%s", modname)) {
125 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126 type = class_search_type(name);
128 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134 spin_lock(&type->obd_type_lock);
136 try_module_get(type->typ_ops->o_owner);
137 spin_unlock(&type->obd_type_lock);
142 void class_put_type(struct obd_type *type)
145 spin_lock(&type->obd_type_lock);
147 module_put(type->typ_ops->o_owner);
148 spin_unlock(&type->obd_type_lock);
151 int class_register_type(struct obd_ops *ops, struct lprocfs_vars *vars,
154 struct obd_type *type;
158 LASSERT(strnlen(name, 1024) < 1024); /* sanity check */
160 if (class_search_type(name)) {
161 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166 OBD_ALLOC(type, sizeof(*type));
170 OBD_ALLOC(type->typ_ops, sizeof(*type->typ_ops));
171 OBD_ALLOC(type->typ_name, strlen(name) + 1);
172 if (type->typ_ops == NULL || type->typ_name == NULL)
175 *(type->typ_ops) = *ops;
176 strcpy(type->typ_name, name);
177 spin_lock_init(&type->obd_type_lock);
180 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
182 if (IS_ERR(type->typ_procroot)) {
183 rc = PTR_ERR(type->typ_procroot);
184 type->typ_procroot = NULL;
189 spin_lock(&obd_types_lock);
190 list_add(&type->typ_chain, &obd_types);
191 spin_unlock(&obd_types_lock);
196 if (type->typ_name != NULL)
197 OBD_FREE(type->typ_name, strlen(name) + 1);
198 if (type->typ_ops != NULL)
199 OBD_FREE (type->typ_ops, sizeof (*type->typ_ops));
200 OBD_FREE(type, sizeof(*type));
204 int class_unregister_type(const char *name)
206 struct obd_type *type = class_search_type(name);
210 CERROR("unknown obd type\n");
214 if (type->typ_refcnt) {
215 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
216 /* This is a bad situation, let's make the best of it */
217 /* Remove ops, but leave the name for debugging */
218 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
222 if (type->typ_procroot)
223 lprocfs_remove(&type->typ_procroot);
225 spin_lock(&obd_types_lock);
226 list_del(&type->typ_chain);
227 spin_unlock(&obd_types_lock);
228 OBD_FREE(type->typ_name, strlen(name) + 1);
229 if (type->typ_ops != NULL)
230 OBD_FREE(type->typ_ops, sizeof(*type->typ_ops));
231 OBD_FREE(type, sizeof(*type));
233 } /* class_unregister_type */
236 * Create a new obd device.
238 * Find an empty slot in ::obd_devs[], create a new obd device in it.
240 * \param typename [in] obd device type string.
241 * \param name [in] obd device name.
243 * \retval NULL if create fails, otherwise return the obd device
246 struct obd_device *class_newdev(const char *type_name, const char *name)
248 struct obd_device *result = NULL;
249 struct obd_device *newdev;
250 struct obd_type *type = NULL;
252 int new_obd_minor = 0;
254 if (strlen(name) >= MAX_OBD_NAME) {
255 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
256 RETURN(ERR_PTR(-EINVAL));
259 type = class_get_type(type_name);
261 CERROR("OBD: unknown type: %s\n", type_name);
262 RETURN(ERR_PTR(-ENODEV));
265 newdev = obd_device_alloc();
266 if (newdev == NULL) {
267 class_put_type(type);
268 RETURN(ERR_PTR(-ENOMEM));
270 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
272 spin_lock(&obd_dev_lock);
273 for (i = 0; i < class_devno_max(); i++) {
274 struct obd_device *obd = class_num2obd(i);
275 if (obd && obd->obd_name && (strcmp(name, obd->obd_name) == 0)){
276 CERROR("Device %s already exists, won't add\n", name);
278 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
279 "%p obd_magic %08x != %08x\n", result,
280 result->obd_magic, OBD_DEVICE_MAGIC);
281 LASSERTF(result->obd_minor == new_obd_minor,
282 "%p obd_minor %d != %d\n", result,
283 result->obd_minor, new_obd_minor);
285 obd_devs[result->obd_minor] = NULL;
286 result->obd_name[0]='\0';
288 result = ERR_PTR(-EEXIST);
291 if (!result && !obd) {
293 result->obd_minor = i;
295 result->obd_type = type;
296 strncpy(result->obd_name, name,
297 sizeof(result->obd_name) - 1);
298 obd_devs[i] = result;
301 spin_unlock(&obd_dev_lock);
303 if (result == NULL && i >= class_devno_max()) {
304 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
306 result = ERR_PTR(-EOVERFLOW);
309 if (IS_ERR(result)) {
310 obd_device_free(newdev);
311 class_put_type(type);
313 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
314 result->obd_name, result);
319 void class_release_dev(struct obd_device *obd)
321 struct obd_type *obd_type = obd->obd_type;
323 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
324 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
325 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
326 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
327 LASSERT(obd_type != NULL);
329 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
330 obd->obd_name,obd->obd_type->typ_name);
332 spin_lock(&obd_dev_lock);
333 obd_devs[obd->obd_minor] = NULL;
334 spin_unlock(&obd_dev_lock);
335 obd_device_free(obd);
337 class_put_type(obd_type);
340 int class_name2dev(const char *name)
347 spin_lock(&obd_dev_lock);
348 for (i = 0; i < class_devno_max(); i++) {
349 struct obd_device *obd = class_num2obd(i);
350 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
351 /* Make sure we finished attaching before we give
352 out any references */
353 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
354 if (obd->obd_attached) {
355 spin_unlock(&obd_dev_lock);
361 spin_unlock(&obd_dev_lock);
366 struct obd_device *class_name2obd(const char *name)
368 int dev = class_name2dev(name);
370 if (dev < 0 || dev > class_devno_max())
372 return class_num2obd(dev);
375 int class_uuid2dev(struct obd_uuid *uuid)
379 spin_lock(&obd_dev_lock);
380 for (i = 0; i < class_devno_max(); i++) {
381 struct obd_device *obd = class_num2obd(i);
382 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
383 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
384 spin_unlock(&obd_dev_lock);
388 spin_unlock(&obd_dev_lock);
393 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
395 int dev = class_uuid2dev(uuid);
398 return class_num2obd(dev);
402 * Get obd device from ::obd_devs[]
404 * \param num [in] array index
406 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
407 * otherwise return the obd device there.
409 struct obd_device *class_num2obd(int num)
411 struct obd_device *obd = NULL;
413 if (num < class_devno_max()) {
418 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
419 "%p obd_magic %08x != %08x\n",
420 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
421 LASSERTF(obd->obd_minor == num,
422 "%p obd_minor %0d != %0d\n",
423 obd, obd->obd_minor, num);
429 void class_obd_list(void)
434 spin_lock(&obd_dev_lock);
435 for (i = 0; i < class_devno_max(); i++) {
436 struct obd_device *obd = class_num2obd(i);
439 if (obd->obd_stopping)
441 else if (obd->obd_set_up)
443 else if (obd->obd_attached)
447 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
448 i, status, obd->obd_type->typ_name,
449 obd->obd_name, obd->obd_uuid.uuid,
450 atomic_read(&obd->obd_refcount));
452 spin_unlock(&obd_dev_lock);
456 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
457 specified, then only the client with that uuid is returned,
458 otherwise any client connected to the tgt is returned. */
459 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
460 const char * typ_name,
461 struct obd_uuid *grp_uuid)
465 spin_lock(&obd_dev_lock);
466 for (i = 0; i < class_devno_max(); i++) {
467 struct obd_device *obd = class_num2obd(i);
470 if ((strncmp(obd->obd_type->typ_name, typ_name,
471 strlen(typ_name)) == 0)) {
472 if (obd_uuid_equals(tgt_uuid,
473 &obd->u.cli.cl_target_uuid) &&
474 ((grp_uuid)? obd_uuid_equals(grp_uuid,
475 &obd->obd_uuid) : 1)) {
476 spin_unlock(&obd_dev_lock);
481 spin_unlock(&obd_dev_lock);
486 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
487 struct obd_uuid *grp_uuid)
489 struct obd_device *obd;
491 obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
493 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
498 /* Iterate the obd_device list looking devices have grp_uuid. Start
499 searching at *next, and if a device is found, the next index to look
500 at is saved in *next. If next is NULL, then the first matching device
501 will always be returned. */
502 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
508 else if (*next >= 0 && *next < class_devno_max())
513 spin_lock(&obd_dev_lock);
514 for (; i < class_devno_max(); i++) {
515 struct obd_device *obd = class_num2obd(i);
518 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
521 spin_unlock(&obd_dev_lock);
525 spin_unlock(&obd_dev_lock);
531 void obd_cleanup_caches(void)
536 if (obd_device_cachep) {
537 rc = cfs_mem_cache_destroy(obd_device_cachep);
538 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
539 obd_device_cachep = NULL;
542 rc = cfs_mem_cache_destroy(obdo_cachep);
543 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
547 rc = cfs_mem_cache_destroy(import_cachep);
548 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
549 import_cachep = NULL;
554 int obd_init_caches(void)
558 LASSERT(obd_device_cachep == NULL);
559 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
560 sizeof(struct obd_device),
562 if (!obd_device_cachep)
565 LASSERT(obdo_cachep == NULL);
566 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
571 LASSERT(import_cachep == NULL);
572 import_cachep = cfs_mem_cache_create("ll_import_cache",
573 sizeof(struct obd_import),
580 obd_cleanup_caches();
585 /* map connection to client */
586 struct obd_export *class_conn2export(struct lustre_handle *conn)
588 struct obd_export *export;
592 CDEBUG(D_CACHE, "looking for null handle\n");
596 if (conn->cookie == -1) { /* this means assign a new connection */
597 CDEBUG(D_CACHE, "want a new connection\n");
601 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
602 export = class_handle2object(conn->cookie);
606 struct obd_device *class_exp2obd(struct obd_export *exp)
613 struct obd_device *class_conn2obd(struct lustre_handle *conn)
615 struct obd_export *export;
616 export = class_conn2export(conn);
618 struct obd_device *obd = export->exp_obd;
619 class_export_put(export);
625 struct obd_import *class_exp2cliimp(struct obd_export *exp)
627 struct obd_device *obd = exp->exp_obd;
630 return obd->u.cli.cl_import;
633 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
635 struct obd_device *obd = class_conn2obd(conn);
638 return obd->u.cli.cl_import;
641 /* Export management functions */
642 static void export_handle_addref(void *export)
644 class_export_get(export);
647 /* called from mds_commit_cb() in context of journal commit callback
648 * and cannot call any blocking functions. */
649 void __class_export_put(struct obd_export *exp)
651 if (atomic_dec_and_test(&exp->exp_refcount)) {
652 LASSERT (list_empty(&exp->exp_obd_chain));
654 CDEBUG(D_IOCTL, "final put %p/%s\n",
655 exp, exp->exp_client_uuid.uuid);
657 spin_lock(&obd_zombie_impexp_lock);
658 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
659 spin_unlock(&obd_zombie_impexp_lock);
661 obd_zombie_impexp_notify();
664 EXPORT_SYMBOL(__class_export_put);
666 void class_export_destroy(struct obd_export *exp)
668 struct obd_device *obd = exp->exp_obd;
670 LASSERT (atomic_read(&exp->exp_refcount) == 0);
672 CDEBUG(D_IOCTL, "destroying export %p/%s\n", exp,
673 exp->exp_client_uuid.uuid);
675 LASSERT(obd != NULL);
677 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
678 if (exp->exp_connection)
679 ptlrpc_put_connection_superhack(exp->exp_connection);
681 LASSERT(list_empty(&exp->exp_outstanding_replies));
682 LASSERT(list_empty(&exp->exp_uncommitted_replies));
683 LASSERT(list_empty(&exp->exp_req_replay_queue));
684 LASSERT(list_empty(&exp->exp_queued_rpc));
685 obd_destroy_export(exp);
687 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
691 /* Creates a new export, adds it to the hash table, and returns a
692 * pointer to it. The refcount is 2: one for the hash reference, and
693 * one for the pointer returned by this function. */
694 struct obd_export *class_new_export(struct obd_device *obd,
695 struct obd_uuid *cluuid)
697 struct obd_export *export;
700 OBD_ALLOC(export, sizeof(*export));
702 return ERR_PTR(-ENOMEM);
704 export->exp_conn_cnt = 0;
705 export->exp_lock_hash = NULL;
706 atomic_set(&export->exp_refcount, 2);
707 atomic_set(&export->exp_rpc_count, 0);
708 export->exp_obd = obd;
709 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
710 spin_lock_init(&export->exp_uncommitted_replies_lock);
711 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
712 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
713 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
715 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
716 class_handle_hash(&export->exp_handle, export_handle_addref);
717 export->exp_last_request_time = cfs_time_current_sec();
718 spin_lock_init(&export->exp_lock);
719 INIT_HLIST_NODE(&export->exp_uuid_hash);
720 INIT_HLIST_NODE(&export->exp_nid_hash);
722 export->exp_client_uuid = *cluuid;
723 obd_init_export(export);
725 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
726 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
727 &export->exp_uuid_hash);
729 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
730 obd->obd_name, cluuid->uuid, rc);
731 class_handle_unhash(&export->exp_handle);
732 OBD_FREE_PTR(export);
733 return ERR_PTR(-EALREADY);
737 spin_lock(&obd->obd_dev_lock);
738 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
740 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
741 list_add_tail(&export->exp_obd_chain_timed,
742 &export->exp_obd->obd_exports_timed);
743 export->exp_obd->obd_num_exports++;
744 spin_unlock(&obd->obd_dev_lock);
748 EXPORT_SYMBOL(class_new_export);
750 void class_unlink_export(struct obd_export *exp)
752 class_handle_unhash(&exp->exp_handle);
754 spin_lock(&exp->exp_obd->obd_dev_lock);
755 /* delete an uuid-export hashitem from hashtables */
756 if (!hlist_unhashed(&exp->exp_uuid_hash))
757 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
758 &exp->exp_client_uuid,
759 &exp->exp_uuid_hash);
761 list_del_init(&exp->exp_obd_chain);
762 list_del_init(&exp->exp_obd_chain_timed);
763 exp->exp_obd->obd_num_exports--;
764 spin_unlock(&exp->exp_obd->obd_dev_lock);
765 /* Keep these counter valid always */
766 spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
767 if (exp->exp_delayed)
768 exp->exp_obd->obd_delayed_clients--;
769 else if (exp->exp_replay_needed)
770 exp->exp_obd->obd_recoverable_clients--;
771 spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
772 class_export_put(exp);
774 EXPORT_SYMBOL(class_unlink_export);
776 /* Import management functions */
777 static void import_handle_addref(void *import)
779 class_import_get(import);
782 struct obd_import *class_import_get(struct obd_import *import)
784 LASSERT(atomic_read(&import->imp_refcount) >= 0);
785 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
786 atomic_inc(&import->imp_refcount);
787 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
788 atomic_read(&import->imp_refcount),
789 import->imp_obd->obd_name);
792 EXPORT_SYMBOL(class_import_get);
794 void class_import_put(struct obd_import *import)
798 LASSERT(atomic_read(&import->imp_refcount) > 0);
799 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
800 LASSERT(list_empty(&import->imp_zombie_chain));
802 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
803 atomic_read(&import->imp_refcount) - 1,
804 import->imp_obd->obd_name);
806 if (atomic_dec_and_test(&import->imp_refcount)) {
807 CDEBUG(D_INFO, "final put import %p\n", import);
808 spin_lock(&obd_zombie_impexp_lock);
809 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
810 spin_unlock(&obd_zombie_impexp_lock);
812 obd_zombie_impexp_notify();
817 EXPORT_SYMBOL(class_import_put);
819 void class_import_destroy(struct obd_import *import)
823 CDEBUG(D_IOCTL, "destroying import %p\n", import);
825 LASSERT(atomic_read(&import->imp_refcount) == 0);
827 ptlrpc_put_connection_superhack(import->imp_connection);
829 while (!list_empty(&import->imp_conn_list)) {
830 struct obd_import_conn *imp_conn;
832 imp_conn = list_entry(import->imp_conn_list.next,
833 struct obd_import_conn, oic_item);
834 list_del(&imp_conn->oic_item);
835 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
836 OBD_FREE(imp_conn, sizeof(*imp_conn));
839 class_decref(import->imp_obd);
840 OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
844 static void init_imp_at(struct imp_at *at) {
846 at_init(&at->iat_net_latency, 0, 0);
847 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
848 /* max service estimates are tracked on the server side, so
849 don't use the AT history here, just use the last reported
850 val. (But keep hist for proc histogram, worst_ever) */
851 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
856 struct obd_import *class_new_import(struct obd_device *obd)
858 struct obd_import *imp;
860 OBD_ALLOC(imp, sizeof(*imp));
864 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
865 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
866 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
867 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
868 spin_lock_init(&imp->imp_lock);
869 imp->imp_last_success_conn = 0;
870 imp->imp_state = LUSTRE_IMP_NEW;
871 imp->imp_obd = class_incref(obd);
872 cfs_waitq_init(&imp->imp_recovery_waitq);
874 atomic_set(&imp->imp_refcount, 2);
875 atomic_set(&imp->imp_unregistering, 0);
876 atomic_set(&imp->imp_inflight, 0);
877 atomic_set(&imp->imp_replay_inflight, 0);
878 atomic_set(&imp->imp_inval_count, 0);
879 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
880 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
881 class_handle_hash(&imp->imp_handle, import_handle_addref);
882 init_imp_at(&imp->imp_at);
884 /* b1_8 supports both v1 & v2. but HEAD only supports v2.
887 #define HAVE_DEFAULT_V2_CONNECT 1
888 #ifdef HAVE_DEFAULT_V2_CONNECT
889 /* the default magic is V2, will be used in connect RPC, and
890 * then adjusted according to the flags in request/reply. */
891 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
893 /* the default magic is V1, will be used in connect RPC, and
894 * then adjusted according to the flags in request/reply. */
895 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V1;
900 EXPORT_SYMBOL(class_new_import);
902 void class_destroy_import(struct obd_import *import)
904 LASSERT(import != NULL);
905 LASSERT(import != LP_POISON);
907 class_handle_unhash(&import->imp_handle);
909 spin_lock(&import->imp_lock);
910 import->imp_generation++;
911 spin_unlock(&import->imp_lock);
913 class_import_put(import);
915 EXPORT_SYMBOL(class_destroy_import);
917 /* A connection defines an export context in which preallocation can
918 be managed. This releases the export pointer reference, and returns
919 the export handle, so the export refcount is 1 when this function
921 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
922 struct obd_uuid *cluuid)
924 struct obd_export *export;
925 LASSERT(conn != NULL);
926 LASSERT(obd != NULL);
927 LASSERT(cluuid != NULL);
930 export = class_new_export(obd, cluuid);
932 RETURN(PTR_ERR(export));
934 conn->cookie = export->exp_handle.h_cookie;
935 class_export_put(export);
937 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
938 cluuid->uuid, conn->cookie);
941 EXPORT_SYMBOL(class_connect);
943 /* This function removes 1-3 references from the export:
944 * 1 - for export pointer passed
945 * and if disconnect really need
946 * 2 - removing from hash
947 * 3 - in client_unlink_export
948 * The export pointer passed to this function can destroyed */
949 int class_disconnect(struct obd_export *export)
951 int already_disconnected;
954 if (export == NULL) {
956 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
960 spin_lock(&export->exp_lock);
961 already_disconnected = export->exp_disconnected;
962 export->exp_disconnected = 1;
963 spin_unlock(&export->exp_lock);
966 /* class_cleanup(), abort_recovery(), and class_fail_export()
967 * all end up in here, and if any of them race we shouldn't
968 * call extra class_export_puts(). */
969 if (already_disconnected)
970 GOTO(no_disconn, already_disconnected);
972 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
973 export->exp_handle.h_cookie);
976 if (!hlist_unhashed(&export->exp_nid_hash))
977 lustre_hash_del(export->exp_obd->obd_nid_hash,
978 &export->exp_connection->c_peer.nid,
979 &export->exp_nid_hash);
981 class_unlink_export(export);
984 class_export_put(export);
988 /* Return non-zero for a fully connected export */
989 int class_connected_export(struct obd_export *exp)
993 spin_lock(&exp->exp_lock);
994 connected = (exp->exp_conn_cnt > 0);
995 spin_unlock(&exp->exp_lock);
1000 EXPORT_SYMBOL(class_connected_export);
1002 static void class_disconnect_export_list(struct list_head *list,
1003 enum obd_option flags)
1006 struct obd_export *exp;
1009 /* It's possible that an export may disconnect itself, but
1010 * nothing else will be added to this list. */
1011 while (!list_empty(list)) {
1012 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1013 /* need for safe call CDEBUG after obd_disconnect */
1014 class_export_get(exp);
1016 spin_lock(&exp->exp_lock);
1017 exp->exp_flags = flags;
1018 spin_unlock(&exp->exp_lock);
1020 if (obd_uuid_equals(&exp->exp_client_uuid,
1021 &exp->exp_obd->obd_uuid)) {
1023 "exp %p export uuid == obd uuid, don't discon\n",
1025 /* Need to delete this now so we don't end up pointing
1026 * to work_list later when this export is cleaned up. */
1027 list_del_init(&exp->exp_obd_chain);
1028 class_export_put(exp);
1032 class_export_get(exp);
1033 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1034 "last request at %ld\n",
1035 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1036 exp, exp->exp_last_request_time);
1038 /* release one export reference anyway */
1039 rc = obd_disconnect(exp);
1040 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1041 obd_export_nid2str(exp), exp, rc);
1042 class_export_put(exp);
1047 void class_disconnect_exports(struct obd_device *obd)
1049 struct list_head work_list;
1052 /* Move all of the exports from obd_exports to a work list, en masse. */
1053 CFS_INIT_LIST_HEAD(&work_list);
1054 spin_lock(&obd->obd_dev_lock);
1055 list_splice_init(&obd->obd_delayed_exports, &work_list);
1056 list_splice_init(&obd->obd_exports, &work_list);
1057 spin_unlock(&obd->obd_dev_lock);
1059 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1060 "disconnecting them\n", obd->obd_minor, obd);
1061 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd));
1064 EXPORT_SYMBOL(class_disconnect_exports);
1066 /* Remove exports that have not completed recovery. */
1067 void class_disconnect_stale_exports(struct obd_device *obd,
1068 enum obd_option flags)
1070 struct list_head work_list;
1071 struct list_head *pos, *n;
1072 struct obd_export *exp;
1075 CFS_INIT_LIST_HEAD(&work_list);
1076 spin_lock(&obd->obd_dev_lock);
1077 list_for_each_safe(pos, n, &obd->obd_exports) {
1078 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1079 if (exp->exp_replay_needed) {
1080 list_move(&exp->exp_obd_chain, &work_list);
1081 obd->obd_stale_clients++;
1084 spin_unlock(&obd->obd_dev_lock);
1086 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1087 obd->obd_name, obd->obd_stale_clients);
1088 class_disconnect_export_list(&work_list, flags);
1091 EXPORT_SYMBOL(class_disconnect_stale_exports);
1093 void class_disconnect_expired_exports(struct obd_device *obd)
1095 struct list_head expired_list;
1096 struct obd_export *exp, *n;
1100 CFS_INIT_LIST_HEAD(&expired_list);
1101 spin_lock(&obd->obd_dev_lock);
1102 list_for_each_entry_safe(exp, n, &obd->obd_delayed_exports,
1104 if (exp_expired(exp, obd->u.obt.obt_stale_export_age)) {
1105 list_move(&exp->exp_obd_chain, &expired_list);
1109 spin_unlock(&obd->obd_dev_lock);
1114 CDEBUG(D_INFO, "%s: disconnecting %d expired exports\n",
1115 obd->obd_name, cnt);
1116 class_disconnect_export_list(&expired_list, exp_flags_from_obd(obd));
1120 EXPORT_SYMBOL(class_disconnect_expired_exports);
1122 void class_set_export_delayed(struct obd_export *exp)
1124 struct obd_device *obd = class_exp2obd(exp);
1126 LASSERT(!exp->exp_delayed);
1127 spin_lock(&exp->exp_lock);
1128 exp->exp_delayed = 1;
1129 spin_unlock(&exp->exp_lock);
1131 /* no need to ping delayed exports */
1132 spin_lock(&obd->obd_dev_lock);
1133 list_del_init(&exp->exp_obd_chain_timed);
1134 list_move_tail(&exp->exp_obd_chain, &obd->obd_delayed_exports);
1135 spin_unlock(&obd->obd_dev_lock);
1137 LASSERT(obd->obd_recoverable_clients > 0);
1139 spin_lock_bh(&obd->obd_processing_task_lock);
1140 obd->obd_delayed_clients++;
1141 obd->obd_recoverable_clients--;
1142 spin_unlock_bh(&obd->obd_processing_task_lock);
1144 CDEBUG(D_HA, "%s: set client %s as delayed\n",
1145 obd->obd_name, exp->exp_client_uuid.uuid);
1147 EXPORT_SYMBOL(class_set_export_delayed);
1150 * Manage exports that have not completed recovery.
1152 void class_handle_stale_exports(struct obd_device *obd)
1154 struct list_head delay_list, evict_list;
1155 struct obd_export *exp, *n;
1159 CFS_INIT_LIST_HEAD(&delay_list);
1160 CFS_INIT_LIST_HEAD(&evict_list);
1161 spin_lock(&obd->obd_dev_lock);
1162 list_for_each_entry_safe(exp, n, &obd->obd_exports, exp_obd_chain) {
1163 LASSERT(!exp->exp_delayed);
1164 /* clients finished recovery */
1165 if (!exp->exp_replay_needed)
1167 /* connected non-vbr clients are evicted */
1168 if (exp->exp_in_recovery && !exp_connect_vbr(exp)) {
1169 obd->obd_stale_clients++;
1170 list_move_tail(&exp->exp_obd_chain, &evict_list);
1173 if (obd->obd_version_recov || !exp->exp_in_recovery) {
1174 list_move_tail(&exp->exp_obd_chain, &delay_list);
1178 #ifndef HAVE_DELAYED_RECOVERY
1179 /* delayed recovery is turned off, evict all delayed exports */
1180 list_splice_init(&delay_list, &evict_list);
1181 list_splice_init(&obd->obd_delayed_exports, &evict_list);
1182 obd->obd_stale_clients += delayed;
1184 spin_unlock(&obd->obd_dev_lock);
1186 list_for_each_entry_safe(exp, n, &delay_list, exp_obd_chain) {
1187 class_set_export_delayed(exp);
1188 exp->exp_last_request_time = cfs_time_current_sec();
1190 LASSERT(list_empty(&delay_list));
1192 /* evict clients without VBR support */
1193 class_disconnect_export_list(&evict_list, exp_flags_from_obd(obd));
1197 EXPORT_SYMBOL(class_handle_stale_exports);
1199 int oig_init(struct obd_io_group **oig_out)
1201 struct obd_io_group *oig;
1204 OBD_ALLOC(oig, sizeof(*oig));
1208 spin_lock_init(&oig->oig_lock);
1210 oig->oig_pending = 0;
1211 atomic_set(&oig->oig_refcount, 1);
1212 cfs_waitq_init(&oig->oig_waitq);
1213 CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1218 EXPORT_SYMBOL(oig_init);
1220 static inline void oig_grab(struct obd_io_group *oig)
1222 atomic_inc(&oig->oig_refcount);
1225 void oig_release(struct obd_io_group *oig)
1227 if (atomic_dec_and_test(&oig->oig_refcount))
1228 OBD_FREE(oig, sizeof(*oig));
1230 EXPORT_SYMBOL(oig_release);
1232 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1235 CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1236 spin_lock(&oig->oig_lock);
1242 list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1244 spin_unlock(&oig->oig_lock);
1249 EXPORT_SYMBOL(oig_add_one);
1251 void oig_complete_one(struct obd_io_group *oig,
1252 struct oig_callback_context *occ, int rc)
1254 cfs_waitq_t *wake = NULL;
1257 spin_lock(&oig->oig_lock);
1260 list_del_init(&occ->occ_oig_item);
1262 old_rc = oig->oig_rc;
1263 if (oig->oig_rc == 0 && rc != 0)
1266 if (--oig->oig_pending <= 0)
1267 wake = &oig->oig_waitq;
1269 spin_unlock(&oig->oig_lock);
1271 CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1272 "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1275 cfs_waitq_signal(wake);
1278 EXPORT_SYMBOL(oig_complete_one);
1280 static int oig_done(struct obd_io_group *oig)
1283 spin_lock(&oig->oig_lock);
1284 if (oig->oig_pending <= 0)
1286 spin_unlock(&oig->oig_lock);
1290 static void interrupted_oig(void *data)
1292 struct obd_io_group *oig = data;
1293 struct oig_callback_context *occ;
1295 spin_lock(&oig->oig_lock);
1296 /* We need to restart the processing each time we drop the lock, as
1297 * it is possible other threads called oig_complete_one() to remove
1298 * an entry elsewhere in the list while we dropped lock. We need to
1299 * drop the lock because osc_ap_completion() calls oig_complete_one()
1300 * which re-gets this lock ;-) as well as a lock ordering issue. */
1302 list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1303 if (occ->interrupted)
1305 occ->interrupted = 1;
1306 spin_unlock(&oig->oig_lock);
1307 occ->occ_interrupted(occ);
1308 spin_lock(&oig->oig_lock);
1311 spin_unlock(&oig->oig_lock);
1314 int oig_wait(struct obd_io_group *oig)
1316 struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1319 CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1322 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1323 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1324 /* we can't continue until the oig has emptied and stopped
1325 * referencing state that the caller will free upon return */
1327 lwi = (struct l_wait_info){ 0, };
1328 } while (rc == -EINTR);
1330 LASSERTF(oig->oig_pending == 0,
1331 "exiting oig_wait(oig = %p) with %d pending\n", oig,
1334 CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1337 EXPORT_SYMBOL(oig_wait);
1339 void class_fail_export(struct obd_export *exp)
1341 int rc, already_failed;
1343 spin_lock(&exp->exp_lock);
1344 already_failed = exp->exp_failed;
1345 exp->exp_failed = 1;
1346 spin_unlock(&exp->exp_lock);
1348 if (already_failed) {
1349 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1350 exp, exp->exp_client_uuid.uuid);
1354 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1355 exp, exp->exp_client_uuid.uuid);
1357 if (obd_dump_on_timeout)
1358 libcfs_debug_dumplog();
1360 /* Most callers into obd_disconnect are removing their own reference
1361 * (request, for example) in addition to the one from the hash table.
1362 * We don't have such a reference here, so make one. */
1363 class_export_get(exp);
1364 rc = obd_disconnect(exp);
1366 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1368 CDEBUG(D_HA, "disconnected export %p/%s\n",
1369 exp, exp->exp_client_uuid.uuid);
1371 EXPORT_SYMBOL(class_fail_export);
1373 char *obd_export_nid2str(struct obd_export *exp)
1375 if (exp->exp_connection != NULL)
1376 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1380 EXPORT_SYMBOL(obd_export_nid2str);
1382 int obd_export_evict_by_nid(struct obd_device *obd, char *nid)
1384 struct obd_export *doomed_exp = NULL;
1385 int exports_evicted = 0;
1387 lnet_nid_t nid_key = libcfs_str2nid(nid);
1390 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1392 if (doomed_exp == NULL)
1395 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1396 "nid %s found, wanted nid %s, requested nid %s\n",
1397 obd_export_nid2str(doomed_exp),
1398 libcfs_nid2str(nid_key), nid);
1401 CDEBUG(D_HA, "%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1402 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1404 class_fail_export(doomed_exp);
1405 class_export_put(doomed_exp);
1408 if (!exports_evicted)
1409 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1410 obd->obd_name, nid);
1411 return exports_evicted;
1413 EXPORT_SYMBOL(obd_export_evict_by_nid);
1415 int obd_export_evict_by_uuid(struct obd_device *obd, char *uuid)
1417 struct obd_export *doomed_exp = NULL;
1418 struct obd_uuid doomed_uuid;
1419 int exports_evicted = 0;
1421 obd_str2uuid(&doomed_uuid, uuid);
1422 if(obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1423 CERROR("%s: can't evict myself\n", obd->obd_name);
1424 return exports_evicted;
1427 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1429 if (doomed_exp == NULL) {
1430 CERROR("%s: can't disconnect %s: no exports found\n",
1431 obd->obd_name, uuid);
1433 CWARN("%s: evicting %s at adminstrative request\n",
1434 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1435 class_fail_export(doomed_exp);
1436 class_export_put(doomed_exp);
1440 return exports_evicted;
1442 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1444 void obd_zombie_impexp_cull(void)
1446 struct obd_import *import;
1447 struct obd_export *export;
1450 spin_lock (&obd_zombie_impexp_lock);
1453 if (!list_empty(&obd_zombie_imports)) {
1454 import = list_entry(obd_zombie_imports.next,
1457 list_del(&import->imp_zombie_chain);
1461 if (!list_empty(&obd_zombie_exports)) {
1462 export = list_entry(obd_zombie_exports.next,
1465 list_del_init(&export->exp_obd_chain);
1468 spin_unlock(&obd_zombie_impexp_lock);
1471 class_import_destroy(import);
1474 class_export_destroy(export);
1476 } while (import != NULL || export != NULL);
1479 static struct completion obd_zombie_start;
1480 static struct completion obd_zombie_stop;
1481 static unsigned long obd_zombie_flags;
1482 static cfs_waitq_t obd_zombie_waitq;
1483 static pid_t obd_zombie_pid;
1489 int obd_zombi_impexp_check(void *arg)
1493 spin_lock(&obd_zombie_impexp_lock);
1494 rc = list_empty(&obd_zombie_imports) &&
1495 list_empty(&obd_zombie_exports) &&
1496 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1498 spin_unlock(&obd_zombie_impexp_lock);
1503 static void obd_zombie_impexp_notify(void)
1505 cfs_waitq_signal(&obd_zombie_waitq);
1509 * check whether obd_zombie is idle
1511 static int obd_zombie_is_idle(void)
1515 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1516 spin_lock(&obd_zombie_impexp_lock);
1517 rc = list_empty(&obd_zombie_imports) &&
1518 list_empty(&obd_zombie_exports);
1519 spin_unlock(&obd_zombie_impexp_lock);
1524 * wait when obd_zombie import/export queues become empty
1526 void obd_zombie_barrier(void)
1528 struct l_wait_info lwi = { 0 };
1530 if (obd_zombie_pid == cfs_curproc_pid())
1531 /* don't wait for myself */
1533 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1535 EXPORT_SYMBOL(obd_zombie_barrier);
1539 static int obd_zombie_impexp_thread(void *unused)
1543 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1544 complete(&obd_zombie_start);
1548 complete(&obd_zombie_start);
1550 obd_zombie_pid = cfs_curproc_pid();
1552 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1553 struct l_wait_info lwi = { 0 };
1555 l_wait_event(obd_zombie_waitq, !obd_zombi_impexp_check(NULL), &lwi);
1557 obd_zombie_impexp_cull();
1560 * Notify obd_zombie_barrier callers that queues
1563 cfs_waitq_signal(&obd_zombie_waitq);
1566 complete(&obd_zombie_stop);
1571 #else /* ! KERNEL */
1573 static atomic_t zombi_recur = ATOMIC_INIT(0);
1574 static void *obd_zombi_impexp_work_cb;
1575 static void *obd_zombi_impexp_idle_cb;
1577 int obd_zombi_impexp_kill(void *arg)
1581 if (atomic_inc_return(&zombi_recur) == 1) {
1582 obd_zombie_impexp_cull();
1585 atomic_dec(&zombi_recur);
1591 int obd_zombie_impexp_init(void)
1595 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1596 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1597 spin_lock_init(&obd_zombie_impexp_lock);
1598 init_completion(&obd_zombie_start);
1599 init_completion(&obd_zombie_stop);
1600 cfs_waitq_init(&obd_zombie_waitq);
1604 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1608 wait_for_completion(&obd_zombie_start);
1611 obd_zombi_impexp_work_cb =
1612 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1613 &obd_zombi_impexp_kill, NULL);
1615 obd_zombi_impexp_idle_cb =
1616 liblustre_register_idle_callback("obd_zombi_impexp_check",
1617 &obd_zombi_impexp_check, NULL);
1624 void obd_zombie_impexp_stop(void)
1626 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1627 obd_zombie_impexp_notify();
1629 wait_for_completion(&obd_zombie_stop);
1631 liblustre_deregister_wait_callback(obd_zombi_impexp_work_cb);
1632 liblustre_deregister_idle_callback(obd_zombi_impexp_idle_cb);