1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 struct list_head obd_zombie_imports;
60 struct list_head obd_zombie_exports;
61 spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 * support functions: we could use inter-module communication, but this
68 * is more portable to other OS's
70 static struct obd_device *obd_device_alloc(void)
72 struct obd_device *obd;
74 OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
76 obd->obd_magic = OBD_DEVICE_MAGIC;
80 EXPORT_SYMBOL(obd_device_alloc);
82 static void obd_device_free(struct obd_device *obd)
85 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87 if (obd->obd_namespace != NULL) {
88 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89 obd, obd->obd_namespace, obd->obd_force);
92 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 EXPORT_SYMBOL(obd_device_free);
96 struct obd_type *class_search_type(const char *name)
98 struct list_head *tmp;
99 struct obd_type *type;
101 spin_lock(&obd_types_lock);
102 list_for_each(tmp, &obd_types) {
103 type = list_entry(tmp, struct obd_type, typ_chain);
104 if (strcmp(type->typ_name, name) == 0) {
105 spin_unlock(&obd_types_lock);
109 spin_unlock(&obd_types_lock);
113 struct obd_type *class_get_type(const char *name)
115 struct obd_type *type = class_search_type(name);
119 const char *modname = name;
120 if (!request_module(modname)) {
121 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122 type = class_search_type(name);
124 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130 spin_lock(&type->obd_type_lock);
132 try_module_get(type->typ_dt_ops->o_owner);
133 spin_unlock(&type->obd_type_lock);
138 void class_put_type(struct obd_type *type)
141 spin_lock(&type->obd_type_lock);
143 module_put(type->typ_dt_ops->o_owner);
144 spin_unlock(&type->obd_type_lock);
147 #define CLASS_MAX_NAME 1024
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150 struct lprocfs_vars *vars, const char *name,
151 struct lu_device_type *ldt)
153 struct obd_type *type;
158 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
160 if (class_search_type(name)) {
161 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166 OBD_ALLOC(type, sizeof(*type));
170 OBD_ALLOC_PTR(type->typ_dt_ops);
171 OBD_ALLOC_PTR(type->typ_md_ops);
172 OBD_ALLOC(type->typ_name, strlen(name) + 1);
174 if (type->typ_dt_ops == NULL ||
175 type->typ_md_ops == NULL ||
176 type->typ_name == NULL)
179 *(type->typ_dt_ops) = *dt_ops;
180 /* md_ops is optional */
182 *(type->typ_md_ops) = *md_ops;
183 strcpy(type->typ_name, name);
184 spin_lock_init(&type->obd_type_lock);
187 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
189 if (IS_ERR(type->typ_procroot)) {
190 rc = PTR_ERR(type->typ_procroot);
191 type->typ_procroot = NULL;
197 rc = ldt->ldt_ops->ldto_init(ldt);
202 spin_lock(&obd_types_lock);
203 list_add(&type->typ_chain, &obd_types);
204 spin_unlock(&obd_types_lock);
209 if (type->typ_name != NULL)
210 OBD_FREE(type->typ_name, strlen(name) + 1);
211 if (type->typ_md_ops != NULL)
212 OBD_FREE_PTR(type->typ_md_ops);
213 if (type->typ_dt_ops != NULL)
214 OBD_FREE_PTR(type->typ_dt_ops);
215 OBD_FREE(type, sizeof(*type));
219 int class_unregister_type(const char *name)
221 struct obd_type *type = class_search_type(name);
225 CERROR("unknown obd type\n");
229 if (type->typ_refcnt) {
230 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231 /* This is a bad situation, let's make the best of it */
232 /* Remove ops, but leave the name for debugging */
233 OBD_FREE_PTR(type->typ_dt_ops);
234 OBD_FREE_PTR(type->typ_md_ops);
238 if (type->typ_procroot) {
239 lprocfs_remove(&type->typ_procroot);
243 type->typ_lu->ldt_ops->ldto_fini(type->typ_lu);
245 spin_lock(&obd_types_lock);
246 list_del(&type->typ_chain);
247 spin_unlock(&obd_types_lock);
248 OBD_FREE(type->typ_name, strlen(name) + 1);
249 if (type->typ_dt_ops != NULL)
250 OBD_FREE_PTR(type->typ_dt_ops);
251 if (type->typ_md_ops != NULL)
252 OBD_FREE_PTR(type->typ_md_ops);
253 OBD_FREE(type, sizeof(*type));
255 } /* class_unregister_type */
258 * Create a new obd device.
260 * Find an empty slot in ::obd_devs[], create a new obd device in it.
262 * \param typename [in] obd device type string.
263 * \param name [in] obd device name.
265 * \retval NULL if create fails, otherwise return the obd device
268 struct obd_device *class_newdev(const char *type_name, const char *name)
270 struct obd_device *result = NULL;
271 struct obd_device *newdev;
272 struct obd_type *type = NULL;
274 int new_obd_minor = 0;
276 if (strlen(name) >= MAX_OBD_NAME) {
277 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278 RETURN(ERR_PTR(-EINVAL));
281 type = class_get_type(type_name);
283 CERROR("OBD: unknown type: %s\n", type_name);
284 RETURN(ERR_PTR(-ENODEV));
287 newdev = obd_device_alloc();
288 if (newdev == NULL) {
289 class_put_type(type);
290 RETURN(ERR_PTR(-ENOMEM));
292 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
294 spin_lock(&obd_dev_lock);
295 for (i = 0; i < class_devno_max(); i++) {
296 struct obd_device *obd = class_num2obd(i);
297 if (obd && obd->obd_name &&
298 (strcmp(name, obd->obd_name) == 0)) {
299 CERROR("Device %s already exists, won't add\n", name);
301 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302 "%p obd_magic %08x != %08x\n", result,
303 result->obd_magic, OBD_DEVICE_MAGIC);
304 LASSERTF(result->obd_minor == new_obd_minor,
305 "%p obd_minor %d != %d\n", result,
306 result->obd_minor, new_obd_minor);
308 obd_devs[result->obd_minor] = NULL;
309 result->obd_name[0]='\0';
311 result = ERR_PTR(-EEXIST);
314 if (!result && !obd) {
316 result->obd_minor = i;
318 result->obd_type = type;
319 strncpy(result->obd_name, name,
320 sizeof(result->obd_name) - 1);
321 obd_devs[i] = result;
324 spin_unlock(&obd_dev_lock);
326 if (result == NULL && i >= class_devno_max()) {
327 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
329 result = ERR_PTR(-EOVERFLOW);
332 if (IS_ERR(result)) {
333 obd_device_free(newdev);
334 class_put_type(type);
336 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337 result->obd_name, result);
342 void class_release_dev(struct obd_device *obd)
344 struct obd_type *obd_type = obd->obd_type;
346 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350 LASSERT(obd_type != NULL);
352 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353 obd->obd_name,obd->obd_type->typ_name);
355 spin_lock(&obd_dev_lock);
356 obd_devs[obd->obd_minor] = NULL;
357 spin_unlock(&obd_dev_lock);
358 obd_device_free(obd);
360 class_put_type(obd_type);
363 int class_name2dev(const char *name)
370 spin_lock(&obd_dev_lock);
371 for (i = 0; i < class_devno_max(); i++) {
372 struct obd_device *obd = class_num2obd(i);
373 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374 /* Make sure we finished attaching before we give
375 out any references */
376 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377 if (obd->obd_attached) {
378 spin_unlock(&obd_dev_lock);
384 spin_unlock(&obd_dev_lock);
389 struct obd_device *class_name2obd(const char *name)
391 int dev = class_name2dev(name);
393 if (dev < 0 || dev > class_devno_max())
395 return class_num2obd(dev);
398 int class_uuid2dev(struct obd_uuid *uuid)
402 spin_lock(&obd_dev_lock);
403 for (i = 0; i < class_devno_max(); i++) {
404 struct obd_device *obd = class_num2obd(i);
405 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407 spin_unlock(&obd_dev_lock);
411 spin_unlock(&obd_dev_lock);
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
418 int dev = class_uuid2dev(uuid);
421 return class_num2obd(dev);
425 * Get obd device from ::obd_devs[]
427 * \param num [in] array index
429 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430 * otherwise return the obd device there.
432 struct obd_device *class_num2obd(int num)
434 struct obd_device *obd = NULL;
436 if (num < class_devno_max()) {
441 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442 "%p obd_magic %08x != %08x\n",
443 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444 LASSERTF(obd->obd_minor == num,
445 "%p obd_minor %0d != %0d\n",
446 obd, obd->obd_minor, num);
452 void class_obd_list(void)
457 spin_lock(&obd_dev_lock);
458 for (i = 0; i < class_devno_max(); i++) {
459 struct obd_device *obd = class_num2obd(i);
462 if (obd->obd_stopping)
464 else if (obd->obd_set_up)
466 else if (obd->obd_attached)
470 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471 i, status, obd->obd_type->typ_name,
472 obd->obd_name, obd->obd_uuid.uuid,
473 atomic_read(&obd->obd_refcount));
475 spin_unlock(&obd_dev_lock);
479 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
480 specified, then only the client with that uuid is returned,
481 otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483 const char * typ_name,
484 struct obd_uuid *grp_uuid)
488 spin_lock(&obd_dev_lock);
489 for (i = 0; i < class_devno_max(); i++) {
490 struct obd_device *obd = class_num2obd(i);
493 if ((strncmp(obd->obd_type->typ_name, typ_name,
494 strlen(typ_name)) == 0)) {
495 if (obd_uuid_equals(tgt_uuid,
496 &obd->u.cli.cl_target_uuid) &&
497 ((grp_uuid)? obd_uuid_equals(grp_uuid,
498 &obd->obd_uuid) : 1)) {
499 spin_unlock(&obd_dev_lock);
504 spin_unlock(&obd_dev_lock);
509 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
510 struct obd_uuid *grp_uuid)
512 struct obd_device *obd;
514 obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
516 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
521 /* Iterate the obd_device list looking devices have grp_uuid. Start
522 searching at *next, and if a device is found, the next index to look
523 at is saved in *next. If next is NULL, then the first matching device
524 will always be returned. */
525 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
531 else if (*next >= 0 && *next < class_devno_max())
536 spin_lock(&obd_dev_lock);
537 for (; i < class_devno_max(); i++) {
538 struct obd_device *obd = class_num2obd(i);
541 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
544 spin_unlock(&obd_dev_lock);
548 spin_unlock(&obd_dev_lock);
554 void obd_cleanup_caches(void)
559 if (obd_device_cachep) {
560 rc = cfs_mem_cache_destroy(obd_device_cachep);
561 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
562 obd_device_cachep = NULL;
565 rc = cfs_mem_cache_destroy(obdo_cachep);
566 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
570 rc = cfs_mem_cache_destroy(import_cachep);
571 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
572 import_cachep = NULL;
575 rc = cfs_mem_cache_destroy(capa_cachep);
576 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
582 int obd_init_caches(void)
586 LASSERT(obd_device_cachep == NULL);
587 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
588 sizeof(struct obd_device),
590 if (!obd_device_cachep)
593 LASSERT(obdo_cachep == NULL);
594 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
599 LASSERT(import_cachep == NULL);
600 import_cachep = cfs_mem_cache_create("ll_import_cache",
601 sizeof(struct obd_import),
606 LASSERT(capa_cachep == NULL);
607 capa_cachep = cfs_mem_cache_create("capa_cache",
608 sizeof(struct obd_capa), 0, 0);
614 obd_cleanup_caches();
619 /* map connection to client */
620 struct obd_export *class_conn2export(struct lustre_handle *conn)
622 struct obd_export *export;
626 CDEBUG(D_CACHE, "looking for null handle\n");
630 if (conn->cookie == -1) { /* this means assign a new connection */
631 CDEBUG(D_CACHE, "want a new connection\n");
635 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
636 export = class_handle2object(conn->cookie);
640 struct obd_device *class_exp2obd(struct obd_export *exp)
647 struct obd_device *class_conn2obd(struct lustre_handle *conn)
649 struct obd_export *export;
650 export = class_conn2export(conn);
652 struct obd_device *obd = export->exp_obd;
653 class_export_put(export);
659 struct obd_import *class_exp2cliimp(struct obd_export *exp)
661 struct obd_device *obd = exp->exp_obd;
664 return obd->u.cli.cl_import;
667 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
669 struct obd_device *obd = class_conn2obd(conn);
672 return obd->u.cli.cl_import;
675 /* Export management functions */
676 static void export_handle_addref(void *export)
678 class_export_get(export);
681 void __class_export_put(struct obd_export *exp)
683 if (atomic_dec_and_test(&exp->exp_refcount)) {
684 LASSERT (list_empty(&exp->exp_obd_chain));
686 CDEBUG(D_IOCTL, "final put %p/%s\n",
687 exp, exp->exp_client_uuid.uuid);
689 spin_lock(&obd_zombie_impexp_lock);
690 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
691 spin_unlock(&obd_zombie_impexp_lock);
693 if (obd_zombie_impexp_notify != NULL)
694 obd_zombie_impexp_notify();
697 EXPORT_SYMBOL(__class_export_put);
699 void class_export_destroy(struct obd_export *exp)
701 struct obd_device *obd = exp->exp_obd;
704 LASSERT (atomic_read(&exp->exp_refcount) == 0);
706 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
707 exp->exp_client_uuid.uuid, obd->obd_name);
709 LASSERT(obd != NULL);
711 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
712 if (exp->exp_connection)
713 ptlrpc_put_connection_superhack(exp->exp_connection);
715 LASSERT(list_empty(&exp->exp_outstanding_replies));
716 LASSERT(list_empty(&exp->exp_req_replay_queue));
717 obd_destroy_export(exp);
719 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
724 /* Creates a new export, adds it to the hash table, and returns a
725 * pointer to it. The refcount is 2: one for the hash reference, and
726 * one for the pointer returned by this function. */
727 struct obd_export *class_new_export(struct obd_device *obd,
728 struct obd_uuid *cluuid)
730 struct obd_export *export;
733 OBD_ALLOC_PTR(export);
735 return ERR_PTR(-ENOMEM);
737 export->exp_conn_cnt = 0;
738 atomic_set(&export->exp_refcount, 2);
739 atomic_set(&export->exp_rpc_count, 0);
740 export->exp_obd = obd;
741 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
742 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
743 /* XXX this should be in LDLM init */
744 CFS_INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
745 spin_lock_init(&export->exp_ldlm_data.led_lock);
747 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
748 class_handle_hash(&export->exp_handle, export_handle_addref);
749 export->exp_last_request_time = cfs_time_current_sec();
750 spin_lock_init(&export->exp_lock);
751 INIT_HLIST_NODE(&export->exp_uuid_hash);
752 INIT_HLIST_NODE(&export->exp_nid_hash);
754 export->exp_sp_peer = LUSTRE_SP_ANY;
755 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
756 export->exp_client_uuid = *cluuid;
757 obd_init_export(export);
759 spin_lock(&obd->obd_dev_lock);
760 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
761 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
762 &export->exp_uuid_hash);
764 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
765 obd->obd_name, cluuid->uuid, rc);
766 spin_unlock(&obd->obd_dev_lock);
767 class_handle_unhash(&export->exp_handle);
768 OBD_FREE_PTR(export);
769 return ERR_PTR(-EALREADY);
773 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
775 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
776 list_add_tail(&export->exp_obd_chain_timed,
777 &export->exp_obd->obd_exports_timed);
778 export->exp_obd->obd_num_exports++;
779 spin_unlock(&obd->obd_dev_lock);
783 EXPORT_SYMBOL(class_new_export);
785 void class_unlink_export(struct obd_export *exp)
787 class_handle_unhash(&exp->exp_handle);
789 spin_lock(&exp->exp_obd->obd_dev_lock);
790 /* delete an uuid-export hashitem from hashtables */
791 if (!hlist_unhashed(&exp->exp_uuid_hash))
792 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
793 &exp->exp_client_uuid,
794 &exp->exp_uuid_hash);
796 list_del_init(&exp->exp_obd_chain);
797 list_del_init(&exp->exp_obd_chain_timed);
798 exp->exp_obd->obd_num_exports--;
799 spin_unlock(&exp->exp_obd->obd_dev_lock);
801 class_export_put(exp);
803 EXPORT_SYMBOL(class_unlink_export);
805 /* Import management functions */
806 static void import_handle_addref(void *import)
808 class_import_get(import);
811 struct obd_import *class_import_get(struct obd_import *import)
813 LASSERT(atomic_read(&import->imp_refcount) >= 0);
814 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
815 atomic_inc(&import->imp_refcount);
816 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
817 atomic_read(&import->imp_refcount));
820 EXPORT_SYMBOL(class_import_get);
822 void class_import_put(struct obd_import *import)
826 LASSERT(atomic_read(&import->imp_refcount) > 0);
827 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
828 LASSERT(list_empty(&import->imp_zombie_chain));
830 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
831 atomic_read(&import->imp_refcount) - 1);
833 if (atomic_dec_and_test(&import->imp_refcount)) {
835 CDEBUG(D_INFO, "final put import %p\n", import);
837 spin_lock(&obd_zombie_impexp_lock);
838 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
839 spin_unlock(&obd_zombie_impexp_lock);
841 if (obd_zombie_impexp_notify != NULL)
842 obd_zombie_impexp_notify();
847 EXPORT_SYMBOL(class_import_put);
849 void class_import_destroy(struct obd_import *import)
853 CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
854 import->imp_obd->obd_name);
856 LASSERT(atomic_read(&import->imp_refcount) == 0);
858 ptlrpc_put_connection_superhack(import->imp_connection);
860 while (!list_empty(&import->imp_conn_list)) {
861 struct obd_import_conn *imp_conn;
863 imp_conn = list_entry(import->imp_conn_list.next,
864 struct obd_import_conn, oic_item);
865 list_del(&imp_conn->oic_item);
866 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
867 OBD_FREE(imp_conn, sizeof(*imp_conn));
870 LASSERT(import->imp_sec == NULL);
871 class_decref(import->imp_obd);
872 OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
876 static void init_imp_at(struct imp_at *at) {
878 at_init(&at->iat_net_latency, 0, 0);
879 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
880 /* max service estimates are tracked on the server side, so
881 don't use the AT history here, just use the last reported
882 val. (But keep hist for proc histogram, worst_ever) */
883 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
888 struct obd_import *class_new_import(struct obd_device *obd)
890 struct obd_import *imp;
892 OBD_ALLOC(imp, sizeof(*imp));
896 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
897 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
898 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
899 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
900 spin_lock_init(&imp->imp_lock);
901 imp->imp_last_success_conn = 0;
902 imp->imp_state = LUSTRE_IMP_NEW;
903 imp->imp_obd = class_incref(obd);
904 sema_init(&imp->imp_sec_mutex, 1);
905 cfs_waitq_init(&imp->imp_recovery_waitq);
907 atomic_set(&imp->imp_refcount, 2);
908 atomic_set(&imp->imp_inflight, 0);
909 atomic_set(&imp->imp_replay_inflight, 0);
910 atomic_set(&imp->imp_inval_count, 0);
911 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
912 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
913 class_handle_hash(&imp->imp_handle, import_handle_addref);
914 init_imp_at(&imp->imp_at);
916 /* the default magic is V2, will be used in connect RPC, and
917 * then adjusted according to the flags in request/reply. */
918 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
922 EXPORT_SYMBOL(class_new_import);
924 void class_destroy_import(struct obd_import *import)
926 LASSERT(import != NULL);
927 LASSERT(import != LP_POISON);
929 class_handle_unhash(&import->imp_handle);
931 spin_lock(&import->imp_lock);
932 import->imp_generation++;
933 spin_unlock(&import->imp_lock);
934 class_import_put(import);
936 EXPORT_SYMBOL(class_destroy_import);
938 /* A connection defines an export context in which preallocation can
939 be managed. This releases the export pointer reference, and returns
940 the export handle, so the export refcount is 1 when this function
942 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
943 struct obd_uuid *cluuid)
945 struct obd_export *export;
946 LASSERT(conn != NULL);
947 LASSERT(obd != NULL);
948 LASSERT(cluuid != NULL);
951 export = class_new_export(obd, cluuid);
953 RETURN(PTR_ERR(export));
955 conn->cookie = export->exp_handle.h_cookie;
956 class_export_put(export);
958 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
959 cluuid->uuid, conn->cookie);
962 EXPORT_SYMBOL(class_connect);
964 /* if export is involved in recovery then clean up related things */
965 void class_export_recovery_cleanup(struct obd_export *exp)
967 struct obd_device *obd = exp->exp_obd;
969 spin_lock_bh(&obd->obd_processing_task_lock);
970 if (obd->obd_recovering && exp->exp_in_recovery) {
971 spin_lock(&exp->exp_lock);
972 exp->exp_in_recovery = 0;
973 spin_unlock(&exp->exp_lock);
974 obd->obd_connected_clients--;
975 /* each connected client is counted as recoverable */
976 obd->obd_recoverable_clients--;
977 if (exp->exp_req_replay_needed) {
978 spin_lock(&exp->exp_lock);
979 exp->exp_req_replay_needed = 0;
980 spin_unlock(&exp->exp_lock);
981 LASSERT(atomic_read(&obd->obd_req_replay_clients));
982 atomic_dec(&obd->obd_req_replay_clients);
984 if (exp->exp_lock_replay_needed) {
985 spin_lock(&exp->exp_lock);
986 exp->exp_lock_replay_needed = 0;
987 spin_unlock(&exp->exp_lock);
988 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
989 atomic_dec(&obd->obd_lock_replay_clients);
992 spin_unlock_bh(&obd->obd_processing_task_lock);
995 /* This function removes two references from the export: one for the
996 * hash entry and one for the export pointer passed in. The export
997 * pointer passed to this function is destroyed should not be used
999 int class_disconnect(struct obd_export *export)
1001 int already_disconnected;
1004 if (export == NULL) {
1006 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1010 spin_lock(&export->exp_lock);
1011 already_disconnected = export->exp_disconnected;
1012 export->exp_disconnected = 1;
1014 if (!hlist_unhashed(&export->exp_nid_hash))
1015 lustre_hash_del(export->exp_obd->obd_nid_hash,
1016 &export->exp_connection->c_peer.nid,
1017 &export->exp_nid_hash);
1019 spin_unlock(&export->exp_lock);
1021 /* class_cleanup(), abort_recovery(), and class_fail_export()
1022 * all end up in here, and if any of them race we shouldn't
1023 * call extra class_export_puts(). */
1024 if (already_disconnected)
1027 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1028 export->exp_handle.h_cookie);
1030 class_export_recovery_cleanup(export);
1031 class_unlink_export(export);
1032 class_export_put(export);
1036 static void class_disconnect_export_list(struct list_head *list, int flags)
1039 struct lustre_handle fake_conn;
1040 struct obd_export *fake_exp, *exp;
1043 /* It's possible that an export may disconnect itself, but
1044 * nothing else will be added to this list. */
1045 while (!list_empty(list)) {
1046 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1047 class_export_get(exp);
1049 spin_lock(&exp->exp_lock);
1050 exp->exp_flags = flags;
1051 spin_unlock(&exp->exp_lock);
1053 if (obd_uuid_equals(&exp->exp_client_uuid,
1054 &exp->exp_obd->obd_uuid)) {
1056 "exp %p export uuid == obd uuid, don't discon\n",
1058 /* Need to delete this now so we don't end up pointing
1059 * to work_list later when this export is cleaned up. */
1060 list_del_init(&exp->exp_obd_chain);
1061 class_export_put(exp);
1065 fake_conn.cookie = exp->exp_handle.h_cookie;
1066 fake_exp = class_conn2export(&fake_conn);
1068 class_export_put(exp);
1072 spin_lock(&fake_exp->exp_lock);
1073 fake_exp->exp_flags = flags;
1074 spin_unlock(&fake_exp->exp_lock);
1076 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1077 "last request at "CFS_TIME_T"\n",
1078 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1079 exp, exp->exp_last_request_time);
1080 rc = obd_disconnect(fake_exp);
1081 class_export_put(exp);
1086 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1088 return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1089 (obd->obd_force ? OBD_OPT_FORCE : 0));
1092 void class_disconnect_exports(struct obd_device *obd)
1094 struct list_head work_list;
1097 /* Move all of the exports from obd_exports to a work list, en masse. */
1098 spin_lock(&obd->obd_dev_lock);
1099 list_add(&work_list, &obd->obd_exports);
1100 list_del_init(&obd->obd_exports);
1101 spin_unlock(&obd->obd_dev_lock);
1103 if (!list_empty(&work_list)) {
1104 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1105 "disconnecting them\n", obd->obd_minor, obd);
1106 class_disconnect_export_list(&work_list,
1107 get_exp_flags_from_obd(obd));
1109 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1110 obd->obd_minor, obd);
1113 EXPORT_SYMBOL(class_disconnect_exports);
1115 /* Remove exports that have not completed recovery.
1117 int class_disconnect_stale_exports(struct obd_device *obd,
1118 int (*test_export)(struct obd_export *))
1120 struct list_head work_list;
1121 struct list_head *pos, *n;
1122 struct obd_export *exp;
1126 CFS_INIT_LIST_HEAD(&work_list);
1127 spin_lock(&obd->obd_dev_lock);
1128 list_for_each_safe(pos, n, &obd->obd_exports) {
1129 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1130 if (test_export(exp))
1133 list_del(&exp->exp_obd_chain);
1134 list_add(&exp->exp_obd_chain, &work_list);
1135 /* don't count self-export as client */
1136 if (obd_uuid_equals(&exp->exp_client_uuid,
1137 &exp->exp_obd->obd_uuid))
1141 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1142 obd->obd_name, exp->exp_client_uuid.uuid,
1143 exp->exp_connection == NULL ? "<unknown>" :
1144 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1146 spin_unlock(&obd->obd_dev_lock);
1148 CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1149 obd->obd_name, cnt);
1150 class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1153 EXPORT_SYMBOL(class_disconnect_stale_exports);
1155 int oig_init(struct obd_io_group **oig_out)
1157 struct obd_io_group *oig;
1160 OBD_ALLOC(oig, sizeof(*oig));
1164 spin_lock_init(&oig->oig_lock);
1166 oig->oig_pending = 0;
1167 atomic_set(&oig->oig_refcount, 1);
1168 cfs_waitq_init(&oig->oig_waitq);
1169 CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1174 EXPORT_SYMBOL(oig_init);
1176 static inline void oig_grab(struct obd_io_group *oig)
1178 atomic_inc(&oig->oig_refcount);
1181 void oig_release(struct obd_io_group *oig)
1183 if (atomic_dec_and_test(&oig->oig_refcount))
1184 OBD_FREE(oig, sizeof(*oig));
1186 EXPORT_SYMBOL(oig_release);
1188 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1191 CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1192 spin_lock(&oig->oig_lock);
1198 list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1200 spin_unlock(&oig->oig_lock);
1205 EXPORT_SYMBOL(oig_add_one);
1207 void oig_complete_one(struct obd_io_group *oig,
1208 struct oig_callback_context *occ, int rc)
1210 cfs_waitq_t *wake = NULL;
1213 spin_lock(&oig->oig_lock);
1216 list_del_init(&occ->occ_oig_item);
1218 old_rc = oig->oig_rc;
1219 if (oig->oig_rc == 0 && rc != 0)
1222 if (--oig->oig_pending <= 0)
1223 wake = &oig->oig_waitq;
1225 spin_unlock(&oig->oig_lock);
1227 CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1228 "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1231 cfs_waitq_signal(wake);
1234 EXPORT_SYMBOL(oig_complete_one);
1236 static int oig_done(struct obd_io_group *oig)
1239 spin_lock(&oig->oig_lock);
1240 if (oig->oig_pending <= 0)
1242 spin_unlock(&oig->oig_lock);
1246 static void interrupted_oig(void *data)
1248 struct obd_io_group *oig = data;
1249 struct oig_callback_context *occ;
1251 spin_lock(&oig->oig_lock);
1252 /* We need to restart the processing each time we drop the lock, as
1253 * it is possible other threads called oig_complete_one() to remove
1254 * an entry elsewhere in the list while we dropped lock. We need to
1255 * drop the lock because osc_ap_completion() calls oig_complete_one()
1256 * which re-gets this lock ;-) as well as a lock ordering issue. */
1258 list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1259 if (occ->interrupted)
1261 occ->interrupted = 1;
1262 spin_unlock(&oig->oig_lock);
1263 occ->occ_interrupted(occ);
1264 spin_lock(&oig->oig_lock);
1267 spin_unlock(&oig->oig_lock);
1270 int oig_wait(struct obd_io_group *oig)
1272 struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1275 CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1278 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1279 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1280 /* we can't continue until the oig has emptied and stopped
1281 * referencing state that the caller will free upon return */
1283 lwi = (struct l_wait_info){ 0, };
1284 } while (rc == -EINTR);
1286 LASSERTF(oig->oig_pending == 0,
1287 "exiting oig_wait(oig = %p) with %d pending\n", oig,
1290 CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1293 EXPORT_SYMBOL(oig_wait);
1295 void class_fail_export(struct obd_export *exp)
1297 int rc, already_failed;
1299 spin_lock(&exp->exp_lock);
1300 already_failed = exp->exp_failed;
1301 exp->exp_failed = 1;
1302 spin_unlock(&exp->exp_lock);
1304 if (already_failed) {
1305 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1306 exp, exp->exp_client_uuid.uuid);
1310 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1311 exp, exp->exp_client_uuid.uuid);
1313 if (obd_dump_on_timeout)
1314 libcfs_debug_dumplog();
1316 /* Most callers into obd_disconnect are removing their own reference
1317 * (request, for example) in addition to the one from the hash table.
1318 * We don't have such a reference here, so make one. */
1319 class_export_get(exp);
1320 rc = obd_disconnect(exp);
1322 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1324 CDEBUG(D_HA, "disconnected export %p/%s\n",
1325 exp, exp->exp_client_uuid.uuid);
1327 EXPORT_SYMBOL(class_fail_export);
1329 char *obd_export_nid2str(struct obd_export *exp)
1331 if (exp->exp_connection != NULL)
1332 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1336 EXPORT_SYMBOL(obd_export_nid2str);
1338 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1340 struct obd_export *doomed_exp = NULL;
1341 int exports_evicted = 0;
1343 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1346 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1347 if (doomed_exp == NULL)
1350 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1351 "nid %s found, wanted nid %s, requested nid %s\n",
1352 obd_export_nid2str(doomed_exp),
1353 libcfs_nid2str(nid_key), nid);
1354 LASSERTF(doomed_exp != obd->obd_self_export,
1355 "self-export is hashed by NID?\n");
1357 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1358 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1360 class_fail_export(doomed_exp);
1361 class_export_put(doomed_exp);
1364 if (!exports_evicted)
1365 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1366 obd->obd_name, nid);
1367 return exports_evicted;
1369 EXPORT_SYMBOL(obd_export_evict_by_nid);
1371 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1373 struct obd_export *doomed_exp = NULL;
1374 struct obd_uuid doomed_uuid;
1375 int exports_evicted = 0;
1377 obd_str2uuid(&doomed_uuid, uuid);
1378 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1379 CERROR("%s: can't evict myself\n", obd->obd_name);
1380 return exports_evicted;
1383 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1385 if (doomed_exp == NULL) {
1386 CERROR("%s: can't disconnect %s: no exports found\n",
1387 obd->obd_name, uuid);
1389 CWARN("%s: evicting %s at adminstrative request\n",
1390 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1391 class_fail_export(doomed_exp);
1392 class_export_put(doomed_exp);
1396 return exports_evicted;
1398 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1401 * kill zombie imports and exports
1403 void obd_zombie_impexp_cull(void)
1405 struct obd_import *import;
1406 struct obd_export *export;
1410 spin_lock (&obd_zombie_impexp_lock);
1413 if (!list_empty(&obd_zombie_imports)) {
1414 import = list_entry(obd_zombie_imports.next,
1417 list_del(&import->imp_zombie_chain);
1421 if (!list_empty(&obd_zombie_exports)) {
1422 export = list_entry(obd_zombie_exports.next,
1425 list_del_init(&export->exp_obd_chain);
1428 spin_unlock(&obd_zombie_impexp_lock);
1431 class_import_destroy(import);
1434 class_export_destroy(export);
1436 } while (import != NULL || export != NULL);
1440 static struct completion obd_zombie_start;
1441 static struct completion obd_zombie_stop;
1442 static unsigned long obd_zombie_flags;
1443 static cfs_waitq_t obd_zombie_waitq;
1450 * check for work for kill zombie import/export thread.
1452 int obd_zombie_impexp_check(void *arg)
1456 spin_lock(&obd_zombie_impexp_lock);
1457 rc = list_empty(&obd_zombie_imports) &&
1458 list_empty(&obd_zombie_exports) &&
1459 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1461 spin_unlock(&obd_zombie_impexp_lock);
1467 * notify import/export destroy thread about new zombie.
1469 static void obd_zombie_impexp_notify(void)
1471 cfs_waitq_signal(&obd_zombie_waitq);
1477 * destroy zombie export/import thread.
1479 static int obd_zombie_impexp_thread(void *unused)
1483 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1484 complete(&obd_zombie_start);
1488 complete(&obd_zombie_start);
1490 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1491 struct l_wait_info lwi = { 0 };
1493 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1495 obd_zombie_impexp_cull();
1498 complete(&obd_zombie_stop);
1503 #else /* ! KERNEL */
1505 static atomic_t zombie_recur = ATOMIC_INIT(0);
1506 static void *obd_zombie_impexp_work_cb;
1507 static void *obd_zombie_impexp_idle_cb;
1509 int obd_zombie_impexp_kill(void *arg)
1513 if (atomic_inc_return(&zombie_recur) == 1) {
1514 obd_zombie_impexp_cull();
1517 atomic_dec(&zombie_recur);
1524 * start destroy zombie import/export thread
1526 int obd_zombie_impexp_init(void)
1530 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1531 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1532 spin_lock_init(&obd_zombie_impexp_lock);
1533 init_completion(&obd_zombie_start);
1534 init_completion(&obd_zombie_stop);
1535 cfs_waitq_init(&obd_zombie_waitq);
1538 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1542 wait_for_completion(&obd_zombie_start);
1545 obd_zombie_impexp_work_cb =
1546 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1547 &obd_zombie_impexp_kill, NULL);
1549 obd_zombie_impexp_idle_cb =
1550 liblustre_register_idle_callback("obd_zombi_impexp_check",
1551 &obd_zombie_impexp_check, NULL);
1558 * stop destroy zombie import/export thread
1560 void obd_zombie_impexp_stop(void)
1562 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1563 obd_zombie_impexp_notify();
1565 wait_for_completion(&obd_zombie_stop);
1567 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1568 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);