1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 struct list_head obd_zombie_imports;
60 struct list_head obd_zombie_exports;
61 spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 * support functions: we could use inter-module communication, but this
68 * is more portable to other OS's
70 static struct obd_device *obd_device_alloc(void)
72 struct obd_device *obd;
74 OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
76 obd->obd_magic = OBD_DEVICE_MAGIC;
80 EXPORT_SYMBOL(obd_device_alloc);
82 static void obd_device_free(struct obd_device *obd)
85 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87 if (obd->obd_namespace != NULL) {
88 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89 obd, obd->obd_namespace, obd->obd_force);
92 lu_ref_fini(&obd->obd_reference);
93 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 struct obd_type *class_search_type(const char *name)
98 struct list_head *tmp;
99 struct obd_type *type;
101 spin_lock(&obd_types_lock);
102 list_for_each(tmp, &obd_types) {
103 type = list_entry(tmp, struct obd_type, typ_chain);
104 if (strcmp(type->typ_name, name) == 0) {
105 spin_unlock(&obd_types_lock);
109 spin_unlock(&obd_types_lock);
113 struct obd_type *class_get_type(const char *name)
115 struct obd_type *type = class_search_type(name);
119 const char *modname = name;
120 if (!request_module(modname)) {
121 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122 type = class_search_type(name);
124 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130 spin_lock(&type->obd_type_lock);
132 try_module_get(type->typ_dt_ops->o_owner);
133 spin_unlock(&type->obd_type_lock);
138 void class_put_type(struct obd_type *type)
141 spin_lock(&type->obd_type_lock);
143 module_put(type->typ_dt_ops->o_owner);
144 spin_unlock(&type->obd_type_lock);
147 #define CLASS_MAX_NAME 1024
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150 struct lprocfs_vars *vars, const char *name,
151 struct lu_device_type *ldt)
153 struct obd_type *type;
158 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
160 if (class_search_type(name)) {
161 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166 OBD_ALLOC(type, sizeof(*type));
170 OBD_ALLOC_PTR(type->typ_dt_ops);
171 OBD_ALLOC_PTR(type->typ_md_ops);
172 OBD_ALLOC(type->typ_name, strlen(name) + 1);
174 if (type->typ_dt_ops == NULL ||
175 type->typ_md_ops == NULL ||
176 type->typ_name == NULL)
179 *(type->typ_dt_ops) = *dt_ops;
180 /* md_ops is optional */
182 *(type->typ_md_ops) = *md_ops;
183 strcpy(type->typ_name, name);
184 spin_lock_init(&type->obd_type_lock);
187 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
189 if (IS_ERR(type->typ_procroot)) {
190 rc = PTR_ERR(type->typ_procroot);
191 type->typ_procroot = NULL;
197 rc = lu_device_type_init(ldt);
202 spin_lock(&obd_types_lock);
203 list_add(&type->typ_chain, &obd_types);
204 spin_unlock(&obd_types_lock);
209 if (type->typ_name != NULL)
210 OBD_FREE(type->typ_name, strlen(name) + 1);
211 if (type->typ_md_ops != NULL)
212 OBD_FREE_PTR(type->typ_md_ops);
213 if (type->typ_dt_ops != NULL)
214 OBD_FREE_PTR(type->typ_dt_ops);
215 OBD_FREE(type, sizeof(*type));
219 int class_unregister_type(const char *name)
221 struct obd_type *type = class_search_type(name);
225 CERROR("unknown obd type\n");
229 if (type->typ_refcnt) {
230 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231 /* This is a bad situation, let's make the best of it */
232 /* Remove ops, but leave the name for debugging */
233 OBD_FREE_PTR(type->typ_dt_ops);
234 OBD_FREE_PTR(type->typ_md_ops);
238 if (type->typ_procroot) {
239 lprocfs_remove(&type->typ_procroot);
243 lu_device_type_fini(type->typ_lu);
245 spin_lock(&obd_types_lock);
246 list_del(&type->typ_chain);
247 spin_unlock(&obd_types_lock);
248 OBD_FREE(type->typ_name, strlen(name) + 1);
249 if (type->typ_dt_ops != NULL)
250 OBD_FREE_PTR(type->typ_dt_ops);
251 if (type->typ_md_ops != NULL)
252 OBD_FREE_PTR(type->typ_md_ops);
253 OBD_FREE(type, sizeof(*type));
255 } /* class_unregister_type */
258 * Create a new obd device.
260 * Find an empty slot in ::obd_devs[], create a new obd device in it.
262 * \param typename [in] obd device type string.
263 * \param name [in] obd device name.
265 * \retval NULL if create fails, otherwise return the obd device
268 struct obd_device *class_newdev(const char *type_name, const char *name)
270 struct obd_device *result = NULL;
271 struct obd_device *newdev;
272 struct obd_type *type = NULL;
274 int new_obd_minor = 0;
276 if (strlen(name) >= MAX_OBD_NAME) {
277 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278 RETURN(ERR_PTR(-EINVAL));
281 type = class_get_type(type_name);
283 CERROR("OBD: unknown type: %s\n", type_name);
284 RETURN(ERR_PTR(-ENODEV));
287 newdev = obd_device_alloc();
288 if (newdev == NULL) {
289 class_put_type(type);
290 RETURN(ERR_PTR(-ENOMEM));
292 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
294 spin_lock(&obd_dev_lock);
295 for (i = 0; i < class_devno_max(); i++) {
296 struct obd_device *obd = class_num2obd(i);
297 if (obd && obd->obd_name &&
298 (strcmp(name, obd->obd_name) == 0)) {
299 CERROR("Device %s already exists, won't add\n", name);
301 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302 "%p obd_magic %08x != %08x\n", result,
303 result->obd_magic, OBD_DEVICE_MAGIC);
304 LASSERTF(result->obd_minor == new_obd_minor,
305 "%p obd_minor %d != %d\n", result,
306 result->obd_minor, new_obd_minor);
308 obd_devs[result->obd_minor] = NULL;
309 result->obd_name[0]='\0';
311 result = ERR_PTR(-EEXIST);
314 if (!result && !obd) {
316 result->obd_minor = i;
318 result->obd_type = type;
319 strncpy(result->obd_name, name,
320 sizeof(result->obd_name) - 1);
321 obd_devs[i] = result;
324 spin_unlock(&obd_dev_lock);
326 if (result == NULL && i >= class_devno_max()) {
327 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
329 result = ERR_PTR(-EOVERFLOW);
332 if (IS_ERR(result)) {
333 obd_device_free(newdev);
334 class_put_type(type);
336 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337 result->obd_name, result);
342 void class_release_dev(struct obd_device *obd)
344 struct obd_type *obd_type = obd->obd_type;
346 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350 LASSERT(obd_type != NULL);
352 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353 obd->obd_name,obd->obd_type->typ_name);
355 spin_lock(&obd_dev_lock);
356 obd_devs[obd->obd_minor] = NULL;
357 spin_unlock(&obd_dev_lock);
358 obd_device_free(obd);
360 class_put_type(obd_type);
363 int class_name2dev(const char *name)
370 spin_lock(&obd_dev_lock);
371 for (i = 0; i < class_devno_max(); i++) {
372 struct obd_device *obd = class_num2obd(i);
373 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374 /* Make sure we finished attaching before we give
375 out any references */
376 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377 if (obd->obd_attached) {
378 spin_unlock(&obd_dev_lock);
384 spin_unlock(&obd_dev_lock);
389 struct obd_device *class_name2obd(const char *name)
391 int dev = class_name2dev(name);
393 if (dev < 0 || dev > class_devno_max())
395 return class_num2obd(dev);
398 int class_uuid2dev(struct obd_uuid *uuid)
402 spin_lock(&obd_dev_lock);
403 for (i = 0; i < class_devno_max(); i++) {
404 struct obd_device *obd = class_num2obd(i);
405 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407 spin_unlock(&obd_dev_lock);
411 spin_unlock(&obd_dev_lock);
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
418 int dev = class_uuid2dev(uuid);
421 return class_num2obd(dev);
425 * Get obd device from ::obd_devs[]
427 * \param num [in] array index
429 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430 * otherwise return the obd device there.
432 struct obd_device *class_num2obd(int num)
434 struct obd_device *obd = NULL;
436 if (num < class_devno_max()) {
441 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442 "%p obd_magic %08x != %08x\n",
443 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444 LASSERTF(obd->obd_minor == num,
445 "%p obd_minor %0d != %0d\n",
446 obd, obd->obd_minor, num);
452 void class_obd_list(void)
457 spin_lock(&obd_dev_lock);
458 for (i = 0; i < class_devno_max(); i++) {
459 struct obd_device *obd = class_num2obd(i);
462 if (obd->obd_stopping)
464 else if (obd->obd_set_up)
466 else if (obd->obd_attached)
470 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471 i, status, obd->obd_type->typ_name,
472 obd->obd_name, obd->obd_uuid.uuid,
473 atomic_read(&obd->obd_refcount));
475 spin_unlock(&obd_dev_lock);
479 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
480 specified, then only the client with that uuid is returned,
481 otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483 const char * typ_name,
484 struct obd_uuid *grp_uuid)
488 spin_lock(&obd_dev_lock);
489 for (i = 0; i < class_devno_max(); i++) {
490 struct obd_device *obd = class_num2obd(i);
493 if ((strncmp(obd->obd_type->typ_name, typ_name,
494 strlen(typ_name)) == 0)) {
495 if (obd_uuid_equals(tgt_uuid,
496 &obd->u.cli.cl_target_uuid) &&
497 ((grp_uuid)? obd_uuid_equals(grp_uuid,
498 &obd->obd_uuid) : 1)) {
499 spin_unlock(&obd_dev_lock);
504 spin_unlock(&obd_dev_lock);
509 /* Iterate the obd_device list looking devices have grp_uuid. Start
510 searching at *next, and if a device is found, the next index to look
511 at is saved in *next. If next is NULL, then the first matching device
512 will always be returned. */
513 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
519 else if (*next >= 0 && *next < class_devno_max())
524 spin_lock(&obd_dev_lock);
525 for (; i < class_devno_max(); i++) {
526 struct obd_device *obd = class_num2obd(i);
529 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
532 spin_unlock(&obd_dev_lock);
536 spin_unlock(&obd_dev_lock);
542 * to notify sptlrpc log for @fsname has changed, let every relevant OBD
543 * adjust sptlrpc settings accordingly.
545 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
547 struct obd_device *obd;
551 LASSERT(namelen > 0);
553 spin_lock(&obd_dev_lock);
554 for (i = 0; i < class_devno_max(); i++) {
555 obd = class_num2obd(i);
557 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
560 /* only notify mdc, osc, mdt, ost */
561 type = obd->obd_type->typ_name;
562 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
563 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
564 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
565 strcmp(type, LUSTRE_OST_NAME) != 0)
568 if (strncmp(obd->obd_name, fsname, namelen))
571 class_incref(obd, __FUNCTION__, obd);
572 spin_unlock(&obd_dev_lock);
573 rc2 = obd_set_info_async(obd->obd_self_export,
574 sizeof(KEY_SPTLRPC_CONF),
575 KEY_SPTLRPC_CONF, 0, NULL, NULL);
577 class_decref(obd, __FUNCTION__, obd);
578 spin_lock(&obd_dev_lock);
580 spin_unlock(&obd_dev_lock);
583 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
585 void obd_cleanup_caches(void)
590 if (obd_device_cachep) {
591 rc = cfs_mem_cache_destroy(obd_device_cachep);
592 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
593 obd_device_cachep = NULL;
596 rc = cfs_mem_cache_destroy(obdo_cachep);
597 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
601 rc = cfs_mem_cache_destroy(import_cachep);
602 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
603 import_cachep = NULL;
606 rc = cfs_mem_cache_destroy(capa_cachep);
607 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
613 int obd_init_caches(void)
617 LASSERT(obd_device_cachep == NULL);
618 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
619 sizeof(struct obd_device),
621 if (!obd_device_cachep)
624 LASSERT(obdo_cachep == NULL);
625 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
630 LASSERT(import_cachep == NULL);
631 import_cachep = cfs_mem_cache_create("ll_import_cache",
632 sizeof(struct obd_import),
637 LASSERT(capa_cachep == NULL);
638 capa_cachep = cfs_mem_cache_create("capa_cache",
639 sizeof(struct obd_capa), 0, 0);
645 obd_cleanup_caches();
650 /* map connection to client */
651 struct obd_export *class_conn2export(struct lustre_handle *conn)
653 struct obd_export *export;
657 CDEBUG(D_CACHE, "looking for null handle\n");
661 if (conn->cookie == -1) { /* this means assign a new connection */
662 CDEBUG(D_CACHE, "want a new connection\n");
666 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
667 export = class_handle2object(conn->cookie);
671 struct obd_device *class_exp2obd(struct obd_export *exp)
678 struct obd_device *class_conn2obd(struct lustre_handle *conn)
680 struct obd_export *export;
681 export = class_conn2export(conn);
683 struct obd_device *obd = export->exp_obd;
684 class_export_put(export);
690 struct obd_import *class_exp2cliimp(struct obd_export *exp)
692 struct obd_device *obd = exp->exp_obd;
695 return obd->u.cli.cl_import;
698 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
700 struct obd_device *obd = class_conn2obd(conn);
703 return obd->u.cli.cl_import;
706 /* Export management functions */
707 static void export_handle_addref(void *export)
709 class_export_get(export);
712 struct obd_export *class_export_get(struct obd_export *exp)
714 atomic_inc(&exp->exp_refcount);
715 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
716 atomic_read(&exp->exp_refcount));
719 EXPORT_SYMBOL(class_export_get);
721 void class_export_put(struct obd_export *exp)
723 LASSERT(exp != NULL);
724 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
725 atomic_read(&exp->exp_refcount) - 1);
726 LASSERT(atomic_read(&exp->exp_refcount) > 0);
727 LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
729 if (atomic_dec_and_test(&exp->exp_refcount)) {
730 LASSERT (list_empty(&exp->exp_obd_chain));
732 CDEBUG(D_IOCTL, "final put %p/%s\n",
733 exp, exp->exp_client_uuid.uuid);
735 spin_lock(&obd_zombie_impexp_lock);
736 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
737 spin_unlock(&obd_zombie_impexp_lock);
739 if (obd_zombie_impexp_notify != NULL)
740 obd_zombie_impexp_notify();
743 EXPORT_SYMBOL(class_export_put);
745 static void class_export_destroy(struct obd_export *exp)
747 struct obd_device *obd = exp->exp_obd;
750 LASSERT (atomic_read(&exp->exp_refcount) == 0);
752 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
753 exp->exp_client_uuid.uuid, obd->obd_name);
755 LASSERT(obd != NULL);
757 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
758 if (exp->exp_connection)
759 ptlrpc_put_connection_superhack(exp->exp_connection);
761 LASSERT(list_empty(&exp->exp_outstanding_replies));
762 LASSERT(list_empty(&exp->exp_req_replay_queue));
763 LASSERT(list_empty(&exp->exp_queued_rpc));
764 obd_destroy_export(exp);
765 class_decref(obd, "export", exp);
767 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
771 /* Creates a new export, adds it to the hash table, and returns a
772 * pointer to it. The refcount is 2: one for the hash reference, and
773 * one for the pointer returned by this function. */
774 struct obd_export *class_new_export(struct obd_device *obd,
775 struct obd_uuid *cluuid)
777 struct obd_export *export;
780 OBD_ALLOC_PTR(export);
782 return ERR_PTR(-ENOMEM);
784 export->exp_conn_cnt = 0;
785 export->exp_lock_hash = NULL;
786 atomic_set(&export->exp_refcount, 2);
787 atomic_set(&export->exp_rpc_count, 0);
788 export->exp_obd = obd;
789 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
790 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
791 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
792 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
793 class_handle_hash(&export->exp_handle, export_handle_addref);
794 export->exp_last_request_time = cfs_time_current_sec();
795 spin_lock_init(&export->exp_lock);
796 INIT_HLIST_NODE(&export->exp_uuid_hash);
797 INIT_HLIST_NODE(&export->exp_nid_hash);
799 export->exp_sp_peer = LUSTRE_SP_ANY;
800 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
801 export->exp_client_uuid = *cluuid;
802 obd_init_export(export);
804 spin_lock(&obd->obd_dev_lock);
805 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
806 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
807 &export->exp_uuid_hash);
809 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
810 obd->obd_name, cluuid->uuid, rc);
811 spin_unlock(&obd->obd_dev_lock);
812 class_handle_unhash(&export->exp_handle);
813 OBD_FREE_PTR(export);
814 return ERR_PTR(-EALREADY);
818 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
819 class_incref(obd, "export", export);
820 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
821 list_add_tail(&export->exp_obd_chain_timed,
822 &export->exp_obd->obd_exports_timed);
823 export->exp_obd->obd_num_exports++;
824 spin_unlock(&obd->obd_dev_lock);
828 EXPORT_SYMBOL(class_new_export);
830 void class_unlink_export(struct obd_export *exp)
832 class_handle_unhash(&exp->exp_handle);
834 spin_lock(&exp->exp_obd->obd_dev_lock);
835 /* delete an uuid-export hashitem from hashtables */
836 if (!hlist_unhashed(&exp->exp_uuid_hash))
837 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
838 &exp->exp_client_uuid,
839 &exp->exp_uuid_hash);
841 list_del_init(&exp->exp_obd_chain);
842 list_del_init(&exp->exp_obd_chain_timed);
843 exp->exp_obd->obd_num_exports--;
844 spin_unlock(&exp->exp_obd->obd_dev_lock);
846 class_export_put(exp);
848 EXPORT_SYMBOL(class_unlink_export);
850 /* Import management functions */
851 static void import_handle_addref(void *import)
853 class_import_get(import);
856 struct obd_import *class_import_get(struct obd_import *import)
858 LASSERT(atomic_read(&import->imp_refcount) >= 0);
859 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
860 atomic_inc(&import->imp_refcount);
861 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
862 atomic_read(&import->imp_refcount),
863 import->imp_obd->obd_name);
866 EXPORT_SYMBOL(class_import_get);
868 void class_import_put(struct obd_import *import)
872 LASSERT(atomic_read(&import->imp_refcount) > 0);
873 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
874 LASSERT(list_empty(&import->imp_zombie_chain));
876 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
877 atomic_read(&import->imp_refcount) - 1,
878 import->imp_obd->obd_name);
880 if (atomic_dec_and_test(&import->imp_refcount)) {
881 CDEBUG(D_INFO, "final put import %p\n", import);
882 spin_lock(&obd_zombie_impexp_lock);
883 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
884 spin_unlock(&obd_zombie_impexp_lock);
886 if (obd_zombie_impexp_notify != NULL)
887 obd_zombie_impexp_notify();
892 EXPORT_SYMBOL(class_import_put);
894 void class_import_destroy(struct obd_import *import)
898 CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
899 import->imp_obd->obd_name);
901 LASSERT(atomic_read(&import->imp_refcount) == 0);
903 ptlrpc_put_connection_superhack(import->imp_connection);
905 while (!list_empty(&import->imp_conn_list)) {
906 struct obd_import_conn *imp_conn;
908 imp_conn = list_entry(import->imp_conn_list.next,
909 struct obd_import_conn, oic_item);
910 list_del(&imp_conn->oic_item);
911 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
912 OBD_FREE(imp_conn, sizeof(*imp_conn));
915 LASSERT(import->imp_sec == NULL);
916 class_decref(import->imp_obd, "import", import);
917 OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
921 static void init_imp_at(struct imp_at *at) {
923 at_init(&at->iat_net_latency, 0, 0);
924 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
925 /* max service estimates are tracked on the server side, so
926 don't use the AT history here, just use the last reported
927 val. (But keep hist for proc histogram, worst_ever) */
928 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
933 struct obd_import *class_new_import(struct obd_device *obd)
935 struct obd_import *imp;
937 OBD_ALLOC(imp, sizeof(*imp));
941 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
942 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
943 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
944 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
945 spin_lock_init(&imp->imp_lock);
946 imp->imp_last_success_conn = 0;
947 imp->imp_state = LUSTRE_IMP_NEW;
948 imp->imp_obd = class_incref(obd, "import", imp);
949 sema_init(&imp->imp_sec_mutex, 1);
950 cfs_waitq_init(&imp->imp_recovery_waitq);
952 atomic_set(&imp->imp_refcount, 2);
953 atomic_set(&imp->imp_unregistering, 0);
954 atomic_set(&imp->imp_inflight, 0);
955 atomic_set(&imp->imp_replay_inflight, 0);
956 atomic_set(&imp->imp_inval_count, 0);
957 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
958 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
959 class_handle_hash(&imp->imp_handle, import_handle_addref);
960 init_imp_at(&imp->imp_at);
962 /* the default magic is V2, will be used in connect RPC, and
963 * then adjusted according to the flags in request/reply. */
964 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
968 EXPORT_SYMBOL(class_new_import);
970 void class_destroy_import(struct obd_import *import)
972 LASSERT(import != NULL);
973 LASSERT(import != LP_POISON);
975 class_handle_unhash(&import->imp_handle);
977 spin_lock(&import->imp_lock);
978 import->imp_generation++;
979 spin_unlock(&import->imp_lock);
980 class_import_put(import);
982 EXPORT_SYMBOL(class_destroy_import);
984 /* A connection defines an export context in which preallocation can
985 be managed. This releases the export pointer reference, and returns
986 the export handle, so the export refcount is 1 when this function
988 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
989 struct obd_uuid *cluuid)
991 struct obd_export *export;
992 LASSERT(conn != NULL);
993 LASSERT(obd != NULL);
994 LASSERT(cluuid != NULL);
997 export = class_new_export(obd, cluuid);
999 RETURN(PTR_ERR(export));
1001 conn->cookie = export->exp_handle.h_cookie;
1002 class_export_put(export);
1004 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1005 cluuid->uuid, conn->cookie);
1008 EXPORT_SYMBOL(class_connect);
1010 /* if export is involved in recovery then clean up related things */
1011 void class_export_recovery_cleanup(struct obd_export *exp)
1013 struct obd_device *obd = exp->exp_obd;
1015 spin_lock_bh(&obd->obd_processing_task_lock);
1016 if (obd->obd_recovering && exp->exp_in_recovery) {
1017 spin_lock(&exp->exp_lock);
1018 exp->exp_in_recovery = 0;
1019 spin_unlock(&exp->exp_lock);
1020 obd->obd_connected_clients--;
1021 /* each connected client is counted as recoverable */
1022 obd->obd_recoverable_clients--;
1023 if (exp->exp_req_replay_needed) {
1024 spin_lock(&exp->exp_lock);
1025 exp->exp_req_replay_needed = 0;
1026 spin_unlock(&exp->exp_lock);
1027 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1028 atomic_dec(&obd->obd_req_replay_clients);
1030 if (exp->exp_lock_replay_needed) {
1031 spin_lock(&exp->exp_lock);
1032 exp->exp_lock_replay_needed = 0;
1033 spin_unlock(&exp->exp_lock);
1034 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1035 atomic_dec(&obd->obd_lock_replay_clients);
1038 spin_unlock_bh(&obd->obd_processing_task_lock);
1041 /* This function removes two references from the export: one for the
1042 * hash entry and one for the export pointer passed in. The export
1043 * pointer passed to this function is destroyed should not be used
1045 int class_disconnect(struct obd_export *export)
1047 int already_disconnected;
1050 if (export == NULL) {
1052 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1056 spin_lock(&export->exp_lock);
1057 already_disconnected = export->exp_disconnected;
1058 export->exp_disconnected = 1;
1060 if (!hlist_unhashed(&export->exp_nid_hash))
1061 lustre_hash_del(export->exp_obd->obd_nid_hash,
1062 &export->exp_connection->c_peer.nid,
1063 &export->exp_nid_hash);
1065 spin_unlock(&export->exp_lock);
1067 /* class_cleanup(), abort_recovery(), and class_fail_export()
1068 * all end up in here, and if any of them race we shouldn't
1069 * call extra class_export_puts(). */
1070 if (already_disconnected)
1073 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1074 export->exp_handle.h_cookie);
1076 class_export_recovery_cleanup(export);
1077 class_unlink_export(export);
1078 class_export_put(export);
1082 static void class_disconnect_export_list(struct list_head *list, int flags)
1085 struct lustre_handle fake_conn;
1086 struct obd_export *fake_exp, *exp;
1089 /* It's possible that an export may disconnect itself, but
1090 * nothing else will be added to this list. */
1091 while (!list_empty(list)) {
1092 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1093 class_export_get(exp);
1095 spin_lock(&exp->exp_lock);
1096 exp->exp_flags = flags;
1097 spin_unlock(&exp->exp_lock);
1099 if (obd_uuid_equals(&exp->exp_client_uuid,
1100 &exp->exp_obd->obd_uuid)) {
1102 "exp %p export uuid == obd uuid, don't discon\n",
1104 /* Need to delete this now so we don't end up pointing
1105 * to work_list later when this export is cleaned up. */
1106 list_del_init(&exp->exp_obd_chain);
1107 class_export_put(exp);
1111 fake_conn.cookie = exp->exp_handle.h_cookie;
1112 fake_exp = class_conn2export(&fake_conn);
1114 class_export_put(exp);
1118 spin_lock(&fake_exp->exp_lock);
1119 fake_exp->exp_flags = flags;
1120 spin_unlock(&fake_exp->exp_lock);
1122 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1123 "last request at "CFS_TIME_T"\n",
1124 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1125 exp, exp->exp_last_request_time);
1126 rc = obd_disconnect(fake_exp);
1127 class_export_put(exp);
1132 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1134 return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1135 (obd->obd_force ? OBD_OPT_FORCE : 0));
1138 void class_disconnect_exports(struct obd_device *obd)
1140 struct list_head work_list;
1143 /* Move all of the exports from obd_exports to a work list, en masse. */
1144 spin_lock(&obd->obd_dev_lock);
1145 list_add(&work_list, &obd->obd_exports);
1146 list_del_init(&obd->obd_exports);
1147 spin_unlock(&obd->obd_dev_lock);
1149 if (!list_empty(&work_list)) {
1150 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1151 "disconnecting them\n", obd->obd_minor, obd);
1152 class_disconnect_export_list(&work_list,
1153 get_exp_flags_from_obd(obd));
1155 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1156 obd->obd_minor, obd);
1159 EXPORT_SYMBOL(class_disconnect_exports);
1161 /* Remove exports that have not completed recovery.
1163 int class_disconnect_stale_exports(struct obd_device *obd,
1164 int (*test_export)(struct obd_export *))
1166 struct list_head work_list;
1167 struct list_head *pos, *n;
1168 struct obd_export *exp;
1172 CFS_INIT_LIST_HEAD(&work_list);
1173 spin_lock(&obd->obd_dev_lock);
1174 list_for_each_safe(pos, n, &obd->obd_exports) {
1175 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1176 if (test_export(exp))
1179 list_del(&exp->exp_obd_chain);
1180 list_add(&exp->exp_obd_chain, &work_list);
1181 /* don't count self-export as client */
1182 if (obd_uuid_equals(&exp->exp_client_uuid,
1183 &exp->exp_obd->obd_uuid))
1187 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1188 obd->obd_name, exp->exp_client_uuid.uuid,
1189 exp->exp_connection == NULL ? "<unknown>" :
1190 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1192 spin_unlock(&obd->obd_dev_lock);
1194 CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1195 obd->obd_name, cnt);
1196 class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1199 EXPORT_SYMBOL(class_disconnect_stale_exports);
1201 void class_fail_export(struct obd_export *exp)
1203 int rc, already_failed;
1205 spin_lock(&exp->exp_lock);
1206 already_failed = exp->exp_failed;
1207 exp->exp_failed = 1;
1208 spin_unlock(&exp->exp_lock);
1210 if (already_failed) {
1211 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1212 exp, exp->exp_client_uuid.uuid);
1216 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1217 exp, exp->exp_client_uuid.uuid);
1219 if (obd_dump_on_timeout)
1220 libcfs_debug_dumplog();
1222 /* Most callers into obd_disconnect are removing their own reference
1223 * (request, for example) in addition to the one from the hash table.
1224 * We don't have such a reference here, so make one. */
1225 class_export_get(exp);
1226 rc = obd_disconnect(exp);
1228 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1230 CDEBUG(D_HA, "disconnected export %p/%s\n",
1231 exp, exp->exp_client_uuid.uuid);
1233 EXPORT_SYMBOL(class_fail_export);
1235 char *obd_export_nid2str(struct obd_export *exp)
1237 if (exp->exp_connection != NULL)
1238 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1242 EXPORT_SYMBOL(obd_export_nid2str);
1244 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1246 struct obd_export *doomed_exp = NULL;
1247 int exports_evicted = 0;
1249 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1252 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1253 if (doomed_exp == NULL)
1256 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1257 "nid %s found, wanted nid %s, requested nid %s\n",
1258 obd_export_nid2str(doomed_exp),
1259 libcfs_nid2str(nid_key), nid);
1260 LASSERTF(doomed_exp != obd->obd_self_export,
1261 "self-export is hashed by NID?\n");
1263 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1264 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1266 class_fail_export(doomed_exp);
1267 class_export_put(doomed_exp);
1270 if (!exports_evicted)
1271 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1272 obd->obd_name, nid);
1273 return exports_evicted;
1275 EXPORT_SYMBOL(obd_export_evict_by_nid);
1277 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1279 struct obd_export *doomed_exp = NULL;
1280 struct obd_uuid doomed_uuid;
1281 int exports_evicted = 0;
1283 obd_str2uuid(&doomed_uuid, uuid);
1284 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1285 CERROR("%s: can't evict myself\n", obd->obd_name);
1286 return exports_evicted;
1289 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1291 if (doomed_exp == NULL) {
1292 CERROR("%s: can't disconnect %s: no exports found\n",
1293 obd->obd_name, uuid);
1295 CWARN("%s: evicting %s at adminstrative request\n",
1296 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1297 class_fail_export(doomed_exp);
1298 class_export_put(doomed_exp);
1302 return exports_evicted;
1304 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1307 * kill zombie imports and exports
1309 void obd_zombie_impexp_cull(void)
1311 struct obd_import *import;
1312 struct obd_export *export;
1316 spin_lock (&obd_zombie_impexp_lock);
1319 if (!list_empty(&obd_zombie_imports)) {
1320 import = list_entry(obd_zombie_imports.next,
1323 list_del(&import->imp_zombie_chain);
1327 if (!list_empty(&obd_zombie_exports)) {
1328 export = list_entry(obd_zombie_exports.next,
1331 list_del_init(&export->exp_obd_chain);
1334 spin_unlock(&obd_zombie_impexp_lock);
1337 class_import_destroy(import);
1340 class_export_destroy(export);
1342 } while (import != NULL || export != NULL);
1346 static struct completion obd_zombie_start;
1347 static struct completion obd_zombie_stop;
1348 static unsigned long obd_zombie_flags;
1349 static cfs_waitq_t obd_zombie_waitq;
1356 * check for work for kill zombie import/export thread.
1358 static int obd_zombie_impexp_check(void *arg)
1362 spin_lock(&obd_zombie_impexp_lock);
1363 rc = list_empty(&obd_zombie_imports) &&
1364 list_empty(&obd_zombie_exports) &&
1365 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1367 spin_unlock(&obd_zombie_impexp_lock);
1373 * notify import/export destroy thread about new zombie.
1375 static void obd_zombie_impexp_notify(void)
1377 cfs_waitq_signal(&obd_zombie_waitq);
1381 * check whether obd_zombie is idle
1383 static int obd_zombie_is_idle(void)
1387 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1388 spin_lock(&obd_zombie_impexp_lock);
1389 rc = list_empty(&obd_zombie_imports) &&
1390 list_empty(&obd_zombie_exports);
1391 spin_unlock(&obd_zombie_impexp_lock);
1396 * wait when obd_zombie import/export queues become empty
1398 void obd_zombie_barrier(void)
1400 struct l_wait_info lwi = { 0 };
1402 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1404 EXPORT_SYMBOL(obd_zombie_barrier);
1409 * destroy zombie export/import thread.
1411 static int obd_zombie_impexp_thread(void *unused)
1415 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1416 complete(&obd_zombie_start);
1420 complete(&obd_zombie_start);
1422 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1423 struct l_wait_info lwi = { 0 };
1425 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1427 obd_zombie_impexp_cull();
1428 /* Notify obd_zombie_barrier callers that queues may be empty */
1429 cfs_waitq_signal(&obd_zombie_waitq);
1432 complete(&obd_zombie_stop);
1437 #else /* ! KERNEL */
1439 static atomic_t zombie_recur = ATOMIC_INIT(0);
1440 static void *obd_zombie_impexp_work_cb;
1441 static void *obd_zombie_impexp_idle_cb;
1443 int obd_zombie_impexp_kill(void *arg)
1447 if (atomic_inc_return(&zombie_recur) == 1) {
1448 obd_zombie_impexp_cull();
1451 atomic_dec(&zombie_recur);
1458 * start destroy zombie import/export thread
1460 int obd_zombie_impexp_init(void)
1464 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1465 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1466 spin_lock_init(&obd_zombie_impexp_lock);
1467 init_completion(&obd_zombie_start);
1468 init_completion(&obd_zombie_stop);
1469 cfs_waitq_init(&obd_zombie_waitq);
1472 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1476 wait_for_completion(&obd_zombie_start);
1479 obd_zombie_impexp_work_cb =
1480 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1481 &obd_zombie_impexp_kill, NULL);
1483 obd_zombie_impexp_idle_cb =
1484 liblustre_register_idle_callback("obd_zombi_impexp_check",
1485 &obd_zombie_impexp_check, NULL);
1492 * stop destroy zombie import/export thread
1494 void obd_zombie_impexp_stop(void)
1496 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1497 obd_zombie_impexp_notify();
1499 wait_for_completion(&obd_zombie_stop);
1501 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1502 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);