Whamcloud - gitweb
LU-2613 recovery: free open/close request promptly
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
52
53 struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 struct kmem_cache *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, __GFP_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129                         modname = LUSTRE_OSP_NAME;
130
131                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132                         modname = LUSTRE_MDT_NAME;
133
134                 if (!request_module("%s", modname)) {
135                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136                         type = class_search_type(name);
137                 } else {
138                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139                                            modname);
140                 }
141         }
142 #endif
143         if (type) {
144                 spin_lock(&type->obd_type_lock);
145                 type->typ_refcnt++;
146                 try_module_get(type->typ_dt_ops->o_owner);
147                 spin_unlock(&type->obd_type_lock);
148         }
149         return type;
150 }
151 EXPORT_SYMBOL(class_get_type);
152
153 void class_put_type(struct obd_type *type)
154 {
155         LASSERT(type);
156         spin_lock(&type->obd_type_lock);
157         type->typ_refcnt--;
158         module_put(type->typ_dt_ops->o_owner);
159         spin_unlock(&type->obd_type_lock);
160 }
161 EXPORT_SYMBOL(class_put_type);
162
163 #define CLASS_MAX_NAME 1024
164
165 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
166                         struct lprocfs_vars *vars, const char *name,
167                         struct lu_device_type *ldt)
168 {
169         struct obd_type *type;
170         int rc = 0;
171         ENTRY;
172
173         /* sanity check */
174         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
175
176         if (class_search_type(name)) {
177                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
178                 RETURN(-EEXIST);
179         }
180
181         rc = -ENOMEM;
182         OBD_ALLOC(type, sizeof(*type));
183         if (type == NULL)
184                 RETURN(rc);
185
186         OBD_ALLOC_PTR(type->typ_dt_ops);
187         OBD_ALLOC_PTR(type->typ_md_ops);
188         OBD_ALLOC(type->typ_name, strlen(name) + 1);
189
190         if (type->typ_dt_ops == NULL ||
191             type->typ_md_ops == NULL ||
192             type->typ_name == NULL)
193                 GOTO (failed, rc);
194
195         *(type->typ_dt_ops) = *dt_ops;
196         /* md_ops is optional */
197         if (md_ops)
198                 *(type->typ_md_ops) = *md_ops;
199         strcpy(type->typ_name, name);
200         spin_lock_init(&type->obd_type_lock);
201
202 #ifdef LPROCFS
203         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
204                                               vars, type);
205         if (IS_ERR(type->typ_procroot)) {
206                 rc = PTR_ERR(type->typ_procroot);
207                 type->typ_procroot = NULL;
208                 GOTO (failed, rc);
209         }
210 #endif
211         if (ldt != NULL) {
212                 type->typ_lu = ldt;
213                 rc = lu_device_type_init(ldt);
214                 if (rc != 0)
215                         GOTO (failed, rc);
216         }
217
218         spin_lock(&obd_types_lock);
219         cfs_list_add(&type->typ_chain, &obd_types);
220         spin_unlock(&obd_types_lock);
221
222         RETURN (0);
223
224  failed:
225         if (type->typ_name != NULL)
226                 OBD_FREE(type->typ_name, strlen(name) + 1);
227         if (type->typ_md_ops != NULL)
228                 OBD_FREE_PTR(type->typ_md_ops);
229         if (type->typ_dt_ops != NULL)
230                 OBD_FREE_PTR(type->typ_dt_ops);
231         OBD_FREE(type, sizeof(*type));
232         RETURN(rc);
233 }
234 EXPORT_SYMBOL(class_register_type);
235
236 int class_unregister_type(const char *name)
237 {
238         struct obd_type *type = class_search_type(name);
239         ENTRY;
240
241         if (!type) {
242                 CERROR("unknown obd type\n");
243                 RETURN(-EINVAL);
244         }
245
246         if (type->typ_refcnt) {
247                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
248                 /* This is a bad situation, let's make the best of it */
249                 /* Remove ops, but leave the name for debugging */
250                 OBD_FREE_PTR(type->typ_dt_ops);
251                 OBD_FREE_PTR(type->typ_md_ops);
252                 RETURN(-EBUSY);
253         }
254
255         /* we do not use type->typ_procroot as for compatibility purposes
256          * other modules can share names (i.e. lod can use lov entry). so
257          * we can't reference pointer as it can get invalided when another
258          * module removes the entry */
259         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
260
261         if (type->typ_lu)
262                 lu_device_type_fini(type->typ_lu);
263
264         spin_lock(&obd_types_lock);
265         cfs_list_del(&type->typ_chain);
266         spin_unlock(&obd_types_lock);
267         OBD_FREE(type->typ_name, strlen(name) + 1);
268         if (type->typ_dt_ops != NULL)
269                 OBD_FREE_PTR(type->typ_dt_ops);
270         if (type->typ_md_ops != NULL)
271                 OBD_FREE_PTR(type->typ_md_ops);
272         OBD_FREE(type, sizeof(*type));
273         RETURN(0);
274 } /* class_unregister_type */
275 EXPORT_SYMBOL(class_unregister_type);
276
277 /**
278  * Create a new obd device.
279  *
280  * Find an empty slot in ::obd_devs[], create a new obd device in it.
281  *
282  * \param[in] type_name obd device type string.
283  * \param[in] name      obd device name.
284  *
285  * \retval NULL if create fails, otherwise return the obd device
286  *         pointer created.
287  */
288 struct obd_device *class_newdev(const char *type_name, const char *name)
289 {
290         struct obd_device *result = NULL;
291         struct obd_device *newdev;
292         struct obd_type *type = NULL;
293         int i;
294         int new_obd_minor = 0;
295         ENTRY;
296
297         if (strlen(name) >= MAX_OBD_NAME) {
298                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
299                 RETURN(ERR_PTR(-EINVAL));
300         }
301
302         type = class_get_type(type_name);
303         if (type == NULL){
304                 CERROR("OBD: unknown type: %s\n", type_name);
305                 RETURN(ERR_PTR(-ENODEV));
306         }
307
308         newdev = obd_device_alloc();
309         if (newdev == NULL)
310                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
311
312         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
313
314         write_lock(&obd_dev_lock);
315         for (i = 0; i < class_devno_max(); i++) {
316                 struct obd_device *obd = class_num2obd(i);
317
318                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
319                         CERROR("Device %s already exists at %d, won't add\n",
320                                name, i);
321                         if (result) {
322                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
323                                          "%p obd_magic %08x != %08x\n", result,
324                                          result->obd_magic, OBD_DEVICE_MAGIC);
325                                 LASSERTF(result->obd_minor == new_obd_minor,
326                                          "%p obd_minor %d != %d\n", result,
327                                          result->obd_minor, new_obd_minor);
328
329                                 obd_devs[result->obd_minor] = NULL;
330                                 result->obd_name[0]='\0';
331                          }
332                         result = ERR_PTR(-EEXIST);
333                         break;
334                 }
335                 if (!result && !obd) {
336                         result = newdev;
337                         result->obd_minor = i;
338                         new_obd_minor = i;
339                         result->obd_type = type;
340                         strncpy(result->obd_name, name,
341                                 sizeof(result->obd_name) - 1);
342                         obd_devs[i] = result;
343                 }
344         }
345         write_unlock(&obd_dev_lock);
346
347         if (result == NULL && i >= class_devno_max()) {
348                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
349                        class_devno_max());
350                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
351         }
352
353         if (IS_ERR(result))
354                 GOTO(out, result);
355
356         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
357                result->obd_name, result);
358
359         RETURN(result);
360 out:
361         obd_device_free(newdev);
362 out_type:
363         class_put_type(type);
364         return result;
365 }
366
367 void class_release_dev(struct obd_device *obd)
368 {
369         struct obd_type *obd_type = obd->obd_type;
370
371         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
372                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
373         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
374                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
375         LASSERT(obd_type != NULL);
376
377         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
378                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
379
380         write_lock(&obd_dev_lock);
381         obd_devs[obd->obd_minor] = NULL;
382         write_unlock(&obd_dev_lock);
383         obd_device_free(obd);
384
385         class_put_type(obd_type);
386 }
387
388 int class_name2dev(const char *name)
389 {
390         int i;
391
392         if (!name)
393                 return -1;
394
395         read_lock(&obd_dev_lock);
396         for (i = 0; i < class_devno_max(); i++) {
397                 struct obd_device *obd = class_num2obd(i);
398
399                 if (obd && strcmp(name, obd->obd_name) == 0) {
400                         /* Make sure we finished attaching before we give
401                            out any references */
402                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
403                         if (obd->obd_attached) {
404                                 read_unlock(&obd_dev_lock);
405                                 return i;
406                         }
407                         break;
408                 }
409         }
410         read_unlock(&obd_dev_lock);
411
412         return -1;
413 }
414 EXPORT_SYMBOL(class_name2dev);
415
416 struct obd_device *class_name2obd(const char *name)
417 {
418         int dev = class_name2dev(name);
419
420         if (dev < 0 || dev > class_devno_max())
421                 return NULL;
422         return class_num2obd(dev);
423 }
424 EXPORT_SYMBOL(class_name2obd);
425
426 int class_uuid2dev(struct obd_uuid *uuid)
427 {
428         int i;
429
430         read_lock(&obd_dev_lock);
431         for (i = 0; i < class_devno_max(); i++) {
432                 struct obd_device *obd = class_num2obd(i);
433
434                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
435                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
436                         read_unlock(&obd_dev_lock);
437                         return i;
438                 }
439         }
440         read_unlock(&obd_dev_lock);
441
442         return -1;
443 }
444 EXPORT_SYMBOL(class_uuid2dev);
445
446 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
447 {
448         int dev = class_uuid2dev(uuid);
449         if (dev < 0)
450                 return NULL;
451         return class_num2obd(dev);
452 }
453 EXPORT_SYMBOL(class_uuid2obd);
454
455 /**
456  * Get obd device from ::obd_devs[]
457  *
458  * \param num [in] array index
459  *
460  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
461  *         otherwise return the obd device there.
462  */
463 struct obd_device *class_num2obd(int num)
464 {
465         struct obd_device *obd = NULL;
466
467         if (num < class_devno_max()) {
468                 obd = obd_devs[num];
469                 if (obd == NULL)
470                         return NULL;
471
472                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
473                          "%p obd_magic %08x != %08x\n",
474                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
475                 LASSERTF(obd->obd_minor == num,
476                          "%p obd_minor %0d != %0d\n",
477                          obd, obd->obd_minor, num);
478         }
479
480         return obd;
481 }
482 EXPORT_SYMBOL(class_num2obd);
483
484 /**
485  * Get obd devices count. Device in any
486  *    state are counted
487  * \retval obd device count
488  */
489 int get_devices_count(void)
490 {
491         int index, max_index = class_devno_max(), dev_count = 0;
492
493         read_lock(&obd_dev_lock);
494         for (index = 0; index <= max_index; index++) {
495                 struct obd_device *obd = class_num2obd(index);
496                 if (obd != NULL)
497                         dev_count++;
498         }
499         read_unlock(&obd_dev_lock);
500
501         return dev_count;
502 }
503 EXPORT_SYMBOL(get_devices_count);
504
505 void class_obd_list(void)
506 {
507         char *status;
508         int i;
509
510         read_lock(&obd_dev_lock);
511         for (i = 0; i < class_devno_max(); i++) {
512                 struct obd_device *obd = class_num2obd(i);
513
514                 if (obd == NULL)
515                         continue;
516                 if (obd->obd_stopping)
517                         status = "ST";
518                 else if (obd->obd_set_up)
519                         status = "UP";
520                 else if (obd->obd_attached)
521                         status = "AT";
522                 else
523                         status = "--";
524                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
525                          i, status, obd->obd_type->typ_name,
526                          obd->obd_name, obd->obd_uuid.uuid,
527                          cfs_atomic_read(&obd->obd_refcount));
528         }
529         read_unlock(&obd_dev_lock);
530         return;
531 }
532
533 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
534    specified, then only the client with that uuid is returned,
535    otherwise any client connected to the tgt is returned. */
536 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
537                                           const char * typ_name,
538                                           struct obd_uuid *grp_uuid)
539 {
540         int i;
541
542         read_lock(&obd_dev_lock);
543         for (i = 0; i < class_devno_max(); i++) {
544                 struct obd_device *obd = class_num2obd(i);
545
546                 if (obd == NULL)
547                         continue;
548                 if ((strncmp(obd->obd_type->typ_name, typ_name,
549                              strlen(typ_name)) == 0)) {
550                         if (obd_uuid_equals(tgt_uuid,
551                                             &obd->u.cli.cl_target_uuid) &&
552                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
553                                                          &obd->obd_uuid) : 1)) {
554                                 read_unlock(&obd_dev_lock);
555                                 return obd;
556                         }
557                 }
558         }
559         read_unlock(&obd_dev_lock);
560
561         return NULL;
562 }
563 EXPORT_SYMBOL(class_find_client_obd);
564
565 /* Iterate the obd_device list looking devices have grp_uuid. Start
566    searching at *next, and if a device is found, the next index to look
567    at is saved in *next. If next is NULL, then the first matching device
568    will always be returned. */
569 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
570 {
571         int i;
572
573         if (next == NULL)
574                 i = 0;
575         else if (*next >= 0 && *next < class_devno_max())
576                 i = *next;
577         else
578                 return NULL;
579
580         read_lock(&obd_dev_lock);
581         for (; i < class_devno_max(); i++) {
582                 struct obd_device *obd = class_num2obd(i);
583
584                 if (obd == NULL)
585                         continue;
586                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
587                         if (next != NULL)
588                                 *next = i+1;
589                         read_unlock(&obd_dev_lock);
590                         return obd;
591                 }
592         }
593         read_unlock(&obd_dev_lock);
594
595         return NULL;
596 }
597 EXPORT_SYMBOL(class_devices_in_group);
598
599 /**
600  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
601  * adjust sptlrpc settings accordingly.
602  */
603 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
604 {
605         struct obd_device  *obd;
606         const char         *type;
607         int                 i, rc = 0, rc2;
608
609         LASSERT(namelen > 0);
610
611         read_lock(&obd_dev_lock);
612         for (i = 0; i < class_devno_max(); i++) {
613                 obd = class_num2obd(i);
614
615                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
616                         continue;
617
618                 /* only notify mdc, osc, mdt, ost */
619                 type = obd->obd_type->typ_name;
620                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
621                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
622                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
623                     strcmp(type, LUSTRE_OST_NAME) != 0)
624                         continue;
625
626                 if (strncmp(obd->obd_name, fsname, namelen))
627                         continue;
628
629                 class_incref(obd, __FUNCTION__, obd);
630                 read_unlock(&obd_dev_lock);
631                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
632                                          sizeof(KEY_SPTLRPC_CONF),
633                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
634                 rc = rc ? rc : rc2;
635                 class_decref(obd, __FUNCTION__, obd);
636                 read_lock(&obd_dev_lock);
637         }
638         read_unlock(&obd_dev_lock);
639         return rc;
640 }
641 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
642
643 void obd_cleanup_caches(void)
644 {
645         ENTRY;
646         if (obd_device_cachep) {
647                 kmem_cache_destroy(obd_device_cachep);
648                 obd_device_cachep = NULL;
649         }
650         if (obdo_cachep) {
651                 kmem_cache_destroy(obdo_cachep);
652                 obdo_cachep = NULL;
653         }
654         if (import_cachep) {
655                 kmem_cache_destroy(import_cachep);
656                 import_cachep = NULL;
657         }
658         if (capa_cachep) {
659                 kmem_cache_destroy(capa_cachep);
660                 capa_cachep = NULL;
661         }
662         EXIT;
663 }
664
665 int obd_init_caches(void)
666 {
667         ENTRY;
668
669         LASSERT(obd_device_cachep == NULL);
670         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
671                                               sizeof(struct obd_device),
672                                               0, 0, NULL);
673         if (!obd_device_cachep)
674                 GOTO(out, -ENOMEM);
675
676         LASSERT(obdo_cachep == NULL);
677         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
678                                         0, 0, NULL);
679         if (!obdo_cachep)
680                 GOTO(out, -ENOMEM);
681
682         LASSERT(import_cachep == NULL);
683         import_cachep = kmem_cache_create("ll_import_cache",
684                                           sizeof(struct obd_import),
685                                           0, 0, NULL);
686         if (!import_cachep)
687                 GOTO(out, -ENOMEM);
688
689         LASSERT(capa_cachep == NULL);
690         capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
691                                         0, 0, NULL);
692         if (!capa_cachep)
693                 GOTO(out, -ENOMEM);
694
695         RETURN(0);
696 out:
697         obd_cleanup_caches();
698         RETURN(-ENOMEM);
699
700 }
701
702 /* map connection to client */
703 struct obd_export *class_conn2export(struct lustre_handle *conn)
704 {
705         struct obd_export *export;
706         ENTRY;
707
708         if (!conn) {
709                 CDEBUG(D_CACHE, "looking for null handle\n");
710                 RETURN(NULL);
711         }
712
713         if (conn->cookie == -1) {  /* this means assign a new connection */
714                 CDEBUG(D_CACHE, "want a new connection\n");
715                 RETURN(NULL);
716         }
717
718         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
719         export = class_handle2object(conn->cookie, NULL);
720         RETURN(export);
721 }
722 EXPORT_SYMBOL(class_conn2export);
723
724 struct obd_device *class_exp2obd(struct obd_export *exp)
725 {
726         if (exp)
727                 return exp->exp_obd;
728         return NULL;
729 }
730 EXPORT_SYMBOL(class_exp2obd);
731
732 struct obd_device *class_conn2obd(struct lustre_handle *conn)
733 {
734         struct obd_export *export;
735         export = class_conn2export(conn);
736         if (export) {
737                 struct obd_device *obd = export->exp_obd;
738                 class_export_put(export);
739                 return obd;
740         }
741         return NULL;
742 }
743 EXPORT_SYMBOL(class_conn2obd);
744
745 struct obd_import *class_exp2cliimp(struct obd_export *exp)
746 {
747         struct obd_device *obd = exp->exp_obd;
748         if (obd == NULL)
749                 return NULL;
750         return obd->u.cli.cl_import;
751 }
752 EXPORT_SYMBOL(class_exp2cliimp);
753
754 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
755 {
756         struct obd_device *obd = class_conn2obd(conn);
757         if (obd == NULL)
758                 return NULL;
759         return obd->u.cli.cl_import;
760 }
761 EXPORT_SYMBOL(class_conn2cliimp);
762
763 /* Export management functions */
764 static void class_export_destroy(struct obd_export *exp)
765 {
766         struct obd_device *obd = exp->exp_obd;
767         ENTRY;
768
769         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
770         LASSERT(obd != NULL);
771
772         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
773                exp->exp_client_uuid.uuid, obd->obd_name);
774
775         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
776         if (exp->exp_connection)
777                 ptlrpc_put_connection_superhack(exp->exp_connection);
778
779         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
780         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
781         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
782         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
783         obd_destroy_export(exp);
784         class_decref(obd, "export", exp);
785
786         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
787         EXIT;
788 }
789
790 static void export_handle_addref(void *export)
791 {
792         class_export_get(export);
793 }
794
795 static struct portals_handle_ops export_handle_ops = {
796         .hop_addref = export_handle_addref,
797         .hop_free   = NULL,
798 };
799
800 struct obd_export *class_export_get(struct obd_export *exp)
801 {
802         cfs_atomic_inc(&exp->exp_refcount);
803         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
804                cfs_atomic_read(&exp->exp_refcount));
805         return exp;
806 }
807 EXPORT_SYMBOL(class_export_get);
808
809 void class_export_put(struct obd_export *exp)
810 {
811         LASSERT(exp != NULL);
812         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
813         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
814                cfs_atomic_read(&exp->exp_refcount) - 1);
815
816         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
817                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
818                 CDEBUG(D_IOCTL, "final put %p/%s\n",
819                        exp, exp->exp_client_uuid.uuid);
820
821                 /* release nid stat refererence */
822                 lprocfs_exp_cleanup(exp);
823
824                 obd_zombie_export_add(exp);
825         }
826 }
827 EXPORT_SYMBOL(class_export_put);
828
829 /* Creates a new export, adds it to the hash table, and returns a
830  * pointer to it. The refcount is 2: one for the hash reference, and
831  * one for the pointer returned by this function. */
832 struct obd_export *class_new_export(struct obd_device *obd,
833                                     struct obd_uuid *cluuid)
834 {
835         struct obd_export *export;
836         cfs_hash_t *hash = NULL;
837         int rc = 0;
838         ENTRY;
839
840         OBD_ALLOC_PTR(export);
841         if (!export)
842                 return ERR_PTR(-ENOMEM);
843
844         export->exp_conn_cnt = 0;
845         export->exp_lock_hash = NULL;
846         export->exp_flock_hash = NULL;
847         cfs_atomic_set(&export->exp_refcount, 2);
848         cfs_atomic_set(&export->exp_rpc_count, 0);
849         cfs_atomic_set(&export->exp_cb_count, 0);
850         cfs_atomic_set(&export->exp_locks_count, 0);
851 #if LUSTRE_TRACKS_LOCK_EXP_REFS
852         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
853         spin_lock_init(&export->exp_locks_list_guard);
854 #endif
855         cfs_atomic_set(&export->exp_replay_count, 0);
856         export->exp_obd = obd;
857         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
858         spin_lock_init(&export->exp_uncommitted_replies_lock);
859         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
860         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
861         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
862         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
863         class_handle_hash(&export->exp_handle, &export_handle_ops);
864         export->exp_last_request_time = cfs_time_current_sec();
865         spin_lock_init(&export->exp_lock);
866         spin_lock_init(&export->exp_rpc_lock);
867         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
868         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
869         spin_lock_init(&export->exp_bl_list_lock);
870         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
871
872         export->exp_sp_peer = LUSTRE_SP_ANY;
873         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
874         export->exp_client_uuid = *cluuid;
875         obd_init_export(export);
876
877         spin_lock(&obd->obd_dev_lock);
878         /* shouldn't happen, but might race */
879         if (obd->obd_stopping)
880                 GOTO(exit_unlock, rc = -ENODEV);
881
882         hash = cfs_hash_getref(obd->obd_uuid_hash);
883         if (hash == NULL)
884                 GOTO(exit_unlock, rc = -ENODEV);
885         spin_unlock(&obd->obd_dev_lock);
886
887         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
888                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
889                 if (rc != 0) {
890                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
891                                       obd->obd_name, cluuid->uuid, rc);
892                         GOTO(exit_err, rc = -EALREADY);
893                 }
894         }
895
896         spin_lock(&obd->obd_dev_lock);
897         if (obd->obd_stopping) {
898                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
899                 GOTO(exit_unlock, rc = -ENODEV);
900         }
901
902         class_incref(obd, "export", export);
903         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
904         cfs_list_add_tail(&export->exp_obd_chain_timed,
905                           &export->exp_obd->obd_exports_timed);
906         export->exp_obd->obd_num_exports++;
907         spin_unlock(&obd->obd_dev_lock);
908         cfs_hash_putref(hash);
909         RETURN(export);
910
911 exit_unlock:
912         spin_unlock(&obd->obd_dev_lock);
913 exit_err:
914         if (hash)
915                 cfs_hash_putref(hash);
916         class_handle_unhash(&export->exp_handle);
917         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
918         obd_destroy_export(export);
919         OBD_FREE_PTR(export);
920         return ERR_PTR(rc);
921 }
922 EXPORT_SYMBOL(class_new_export);
923
924 void class_unlink_export(struct obd_export *exp)
925 {
926         class_handle_unhash(&exp->exp_handle);
927
928         spin_lock(&exp->exp_obd->obd_dev_lock);
929         /* delete an uuid-export hashitem from hashtables */
930         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
931                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
932                              &exp->exp_client_uuid,
933                              &exp->exp_uuid_hash);
934
935         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
936         cfs_list_del_init(&exp->exp_obd_chain_timed);
937         exp->exp_obd->obd_num_exports--;
938         spin_unlock(&exp->exp_obd->obd_dev_lock);
939         class_export_put(exp);
940 }
941 EXPORT_SYMBOL(class_unlink_export);
942
943 /* Import management functions */
944 void class_import_destroy(struct obd_import *imp)
945 {
946         ENTRY;
947
948         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
949                 imp->imp_obd->obd_name);
950
951         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
952
953         ptlrpc_put_connection_superhack(imp->imp_connection);
954
955         while (!cfs_list_empty(&imp->imp_conn_list)) {
956                 struct obd_import_conn *imp_conn;
957
958                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
959                                           struct obd_import_conn, oic_item);
960                 cfs_list_del_init(&imp_conn->oic_item);
961                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
962                 OBD_FREE(imp_conn, sizeof(*imp_conn));
963         }
964
965         LASSERT(imp->imp_sec == NULL);
966         class_decref(imp->imp_obd, "import", imp);
967         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
968         EXIT;
969 }
970
971 static void import_handle_addref(void *import)
972 {
973         class_import_get(import);
974 }
975
976 static struct portals_handle_ops import_handle_ops = {
977         .hop_addref = import_handle_addref,
978         .hop_free   = NULL,
979 };
980
981 struct obd_import *class_import_get(struct obd_import *import)
982 {
983         cfs_atomic_inc(&import->imp_refcount);
984         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
985                cfs_atomic_read(&import->imp_refcount),
986                import->imp_obd->obd_name);
987         return import;
988 }
989 EXPORT_SYMBOL(class_import_get);
990
991 void class_import_put(struct obd_import *imp)
992 {
993         ENTRY;
994
995         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
996         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
997
998         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
999                cfs_atomic_read(&imp->imp_refcount) - 1,
1000                imp->imp_obd->obd_name);
1001
1002         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1003                 CDEBUG(D_INFO, "final put import %p\n", imp);
1004                 obd_zombie_import_add(imp);
1005         }
1006
1007         /* catch possible import put race */
1008         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1009         EXIT;
1010 }
1011 EXPORT_SYMBOL(class_import_put);
1012
1013 static void init_imp_at(struct imp_at *at) {
1014         int i;
1015         at_init(&at->iat_net_latency, 0, 0);
1016         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1017                 /* max service estimates are tracked on the server side, so
1018                    don't use the AT history here, just use the last reported
1019                    val. (But keep hist for proc histogram, worst_ever) */
1020                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1021                         AT_FLG_NOHIST);
1022         }
1023 }
1024
1025 struct obd_import *class_new_import(struct obd_device *obd)
1026 {
1027         struct obd_import *imp;
1028
1029         OBD_ALLOC(imp, sizeof(*imp));
1030         if (imp == NULL)
1031                 return NULL;
1032
1033         CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1034         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1035         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1036         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1037         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1038         CFS_INIT_LIST_HEAD(&imp->imp_committed_list);
1039         imp->imp_replay_cursor = &imp->imp_committed_list;
1040         spin_lock_init(&imp->imp_lock);
1041         imp->imp_last_success_conn = 0;
1042         imp->imp_state = LUSTRE_IMP_NEW;
1043         imp->imp_obd = class_incref(obd, "import", imp);
1044         mutex_init(&imp->imp_sec_mutex);
1045         init_waitqueue_head(&imp->imp_recovery_waitq);
1046
1047         cfs_atomic_set(&imp->imp_refcount, 2);
1048         cfs_atomic_set(&imp->imp_unregistering, 0);
1049         cfs_atomic_set(&imp->imp_inflight, 0);
1050         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1051         cfs_atomic_set(&imp->imp_inval_count, 0);
1052         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1053         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1054         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1055         init_imp_at(&imp->imp_at);
1056
1057         /* the default magic is V2, will be used in connect RPC, and
1058          * then adjusted according to the flags in request/reply. */
1059         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1060
1061         return imp;
1062 }
1063 EXPORT_SYMBOL(class_new_import);
1064
1065 void class_destroy_import(struct obd_import *import)
1066 {
1067         LASSERT(import != NULL);
1068         LASSERT(import != LP_POISON);
1069
1070         class_handle_unhash(&import->imp_handle);
1071
1072         spin_lock(&import->imp_lock);
1073         import->imp_generation++;
1074         spin_unlock(&import->imp_lock);
1075         class_import_put(import);
1076 }
1077 EXPORT_SYMBOL(class_destroy_import);
1078
1079 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1080
1081 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1082 {
1083         spin_lock(&exp->exp_locks_list_guard);
1084
1085         LASSERT(lock->l_exp_refs_nr >= 0);
1086
1087         if (lock->l_exp_refs_target != NULL &&
1088             lock->l_exp_refs_target != exp) {
1089                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1090                               exp, lock, lock->l_exp_refs_target);
1091         }
1092         if ((lock->l_exp_refs_nr ++) == 0) {
1093                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1094                 lock->l_exp_refs_target = exp;
1095         }
1096         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1097                lock, exp, lock->l_exp_refs_nr);
1098         spin_unlock(&exp->exp_locks_list_guard);
1099 }
1100 EXPORT_SYMBOL(__class_export_add_lock_ref);
1101
1102 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1103 {
1104         spin_lock(&exp->exp_locks_list_guard);
1105         LASSERT(lock->l_exp_refs_nr > 0);
1106         if (lock->l_exp_refs_target != exp) {
1107                 LCONSOLE_WARN("lock %p, "
1108                               "mismatching export pointers: %p, %p\n",
1109                               lock, lock->l_exp_refs_target, exp);
1110         }
1111         if (-- lock->l_exp_refs_nr == 0) {
1112                 cfs_list_del_init(&lock->l_exp_refs_link);
1113                 lock->l_exp_refs_target = NULL;
1114         }
1115         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1116                lock, exp, lock->l_exp_refs_nr);
1117         spin_unlock(&exp->exp_locks_list_guard);
1118 }
1119 EXPORT_SYMBOL(__class_export_del_lock_ref);
1120 #endif
1121
1122 /* A connection defines an export context in which preallocation can
1123    be managed. This releases the export pointer reference, and returns
1124    the export handle, so the export refcount is 1 when this function
1125    returns. */
1126 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1127                   struct obd_uuid *cluuid)
1128 {
1129         struct obd_export *export;
1130         LASSERT(conn != NULL);
1131         LASSERT(obd != NULL);
1132         LASSERT(cluuid != NULL);
1133         ENTRY;
1134
1135         export = class_new_export(obd, cluuid);
1136         if (IS_ERR(export))
1137                 RETURN(PTR_ERR(export));
1138
1139         conn->cookie = export->exp_handle.h_cookie;
1140         class_export_put(export);
1141
1142         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1143                cluuid->uuid, conn->cookie);
1144         RETURN(0);
1145 }
1146 EXPORT_SYMBOL(class_connect);
1147
1148 /* if export is involved in recovery then clean up related things */
1149 void class_export_recovery_cleanup(struct obd_export *exp)
1150 {
1151         struct obd_device *obd = exp->exp_obd;
1152
1153         spin_lock(&obd->obd_recovery_task_lock);
1154         if (exp->exp_delayed)
1155                 obd->obd_delayed_clients--;
1156         if (obd->obd_recovering) {
1157                 if (exp->exp_in_recovery) {
1158                         spin_lock(&exp->exp_lock);
1159                         exp->exp_in_recovery = 0;
1160                         spin_unlock(&exp->exp_lock);
1161                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1162                         cfs_atomic_dec(&obd->obd_connected_clients);
1163                 }
1164
1165                 /* if called during recovery then should update
1166                  * obd_stale_clients counter,
1167                  * lightweight exports are not counted */
1168                 if (exp->exp_failed &&
1169                     (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1170                         exp->exp_obd->obd_stale_clients++;
1171         }
1172         spin_unlock(&obd->obd_recovery_task_lock);
1173         /** Cleanup req replay fields */
1174         if (exp->exp_req_replay_needed) {
1175                 spin_lock(&exp->exp_lock);
1176                 exp->exp_req_replay_needed = 0;
1177                 spin_unlock(&exp->exp_lock);
1178                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1179                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1180         }
1181         /** Cleanup lock replay data */
1182         if (exp->exp_lock_replay_needed) {
1183                 spin_lock(&exp->exp_lock);
1184                 exp->exp_lock_replay_needed = 0;
1185                 spin_unlock(&exp->exp_lock);
1186                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1187                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1188         }
1189 }
1190
1191 /* This function removes 1-3 references from the export:
1192  * 1 - for export pointer passed
1193  * and if disconnect really need
1194  * 2 - removing from hash
1195  * 3 - in client_unlink_export
1196  * The export pointer passed to this function can destroyed */
1197 int class_disconnect(struct obd_export *export)
1198 {
1199         int already_disconnected;
1200         ENTRY;
1201
1202         if (export == NULL) {
1203                 CWARN("attempting to free NULL export %p\n", export);
1204                 RETURN(-EINVAL);
1205         }
1206
1207         spin_lock(&export->exp_lock);
1208         already_disconnected = export->exp_disconnected;
1209         export->exp_disconnected = 1;
1210         spin_unlock(&export->exp_lock);
1211
1212         /* class_cleanup(), abort_recovery(), and class_fail_export()
1213          * all end up in here, and if any of them race we shouldn't
1214          * call extra class_export_puts(). */
1215         if (already_disconnected) {
1216                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1217                 GOTO(no_disconn, already_disconnected);
1218         }
1219
1220         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1221                export->exp_handle.h_cookie);
1222
1223         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1224                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1225                              &export->exp_connection->c_peer.nid,
1226                              &export->exp_nid_hash);
1227
1228         class_export_recovery_cleanup(export);
1229         class_unlink_export(export);
1230 no_disconn:
1231         class_export_put(export);
1232         RETURN(0);
1233 }
1234 EXPORT_SYMBOL(class_disconnect);
1235
1236 /* Return non-zero for a fully connected export */
1237 int class_connected_export(struct obd_export *exp)
1238 {
1239         if (exp) {
1240                 int connected;
1241                 spin_lock(&exp->exp_lock);
1242                 connected = (exp->exp_conn_cnt > 0);
1243                 spin_unlock(&exp->exp_lock);
1244                 return connected;
1245         }
1246         return 0;
1247 }
1248 EXPORT_SYMBOL(class_connected_export);
1249
1250 static void class_disconnect_export_list(cfs_list_t *list,
1251                                          enum obd_option flags)
1252 {
1253         int rc;
1254         struct obd_export *exp;
1255         ENTRY;
1256
1257         /* It's possible that an export may disconnect itself, but
1258          * nothing else will be added to this list. */
1259         while (!cfs_list_empty(list)) {
1260                 exp = cfs_list_entry(list->next, struct obd_export,
1261                                      exp_obd_chain);
1262                 /* need for safe call CDEBUG after obd_disconnect */
1263                 class_export_get(exp);
1264
1265                 spin_lock(&exp->exp_lock);
1266                 exp->exp_flags = flags;
1267                 spin_unlock(&exp->exp_lock);
1268
1269                 if (obd_uuid_equals(&exp->exp_client_uuid,
1270                                     &exp->exp_obd->obd_uuid)) {
1271                         CDEBUG(D_HA,
1272                                "exp %p export uuid == obd uuid, don't discon\n",
1273                                exp);
1274                         /* Need to delete this now so we don't end up pointing
1275                          * to work_list later when this export is cleaned up. */
1276                         cfs_list_del_init(&exp->exp_obd_chain);
1277                         class_export_put(exp);
1278                         continue;
1279                 }
1280
1281                 class_export_get(exp);
1282                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1283                        "last request at "CFS_TIME_T"\n",
1284                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1285                        exp, exp->exp_last_request_time);
1286                 /* release one export reference anyway */
1287                 rc = obd_disconnect(exp);
1288
1289                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1290                        obd_export_nid2str(exp), exp, rc);
1291                 class_export_put(exp);
1292         }
1293         EXIT;
1294 }
1295
1296 void class_disconnect_exports(struct obd_device *obd)
1297 {
1298         cfs_list_t work_list;
1299         ENTRY;
1300
1301         /* Move all of the exports from obd_exports to a work list, en masse. */
1302         CFS_INIT_LIST_HEAD(&work_list);
1303         spin_lock(&obd->obd_dev_lock);
1304         cfs_list_splice_init(&obd->obd_exports, &work_list);
1305         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1306         spin_unlock(&obd->obd_dev_lock);
1307
1308         if (!cfs_list_empty(&work_list)) {
1309                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1310                        "disconnecting them\n", obd->obd_minor, obd);
1311                 class_disconnect_export_list(&work_list,
1312                                              exp_flags_from_obd(obd));
1313         } else
1314                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1315                        obd->obd_minor, obd);
1316         EXIT;
1317 }
1318 EXPORT_SYMBOL(class_disconnect_exports);
1319
1320 /* Remove exports that have not completed recovery.
1321  */
1322 void class_disconnect_stale_exports(struct obd_device *obd,
1323                                     int (*test_export)(struct obd_export *))
1324 {
1325         cfs_list_t work_list;
1326         struct obd_export *exp, *n;
1327         int evicted = 0;
1328         ENTRY;
1329
1330         CFS_INIT_LIST_HEAD(&work_list);
1331         spin_lock(&obd->obd_dev_lock);
1332         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1333                                      exp_obd_chain) {
1334                 /* don't count self-export as client */
1335                 if (obd_uuid_equals(&exp->exp_client_uuid,
1336                                     &exp->exp_obd->obd_uuid))
1337                         continue;
1338
1339                 /* don't evict clients which have no slot in last_rcvd
1340                  * (e.g. lightweight connection) */
1341                 if (exp->exp_target_data.ted_lr_idx == -1)
1342                         continue;
1343
1344                 spin_lock(&exp->exp_lock);
1345                 if (exp->exp_failed || test_export(exp)) {
1346                         spin_unlock(&exp->exp_lock);
1347                         continue;
1348                 }
1349                 exp->exp_failed = 1;
1350                 spin_unlock(&exp->exp_lock);
1351
1352                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1353                 evicted++;
1354                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1355                        obd->obd_name, exp->exp_client_uuid.uuid,
1356                        exp->exp_connection == NULL ? "<unknown>" :
1357                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1358                 print_export_data(exp, "EVICTING", 0);
1359         }
1360         spin_unlock(&obd->obd_dev_lock);
1361
1362         if (evicted)
1363                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1364                               obd->obd_name, evicted);
1365
1366         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1367                                                  OBD_OPT_ABORT_RECOV);
1368         EXIT;
1369 }
1370 EXPORT_SYMBOL(class_disconnect_stale_exports);
1371
1372 void class_fail_export(struct obd_export *exp)
1373 {
1374         int rc, already_failed;
1375
1376         spin_lock(&exp->exp_lock);
1377         already_failed = exp->exp_failed;
1378         exp->exp_failed = 1;
1379         spin_unlock(&exp->exp_lock);
1380
1381         if (already_failed) {
1382                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1383                        exp, exp->exp_client_uuid.uuid);
1384                 return;
1385         }
1386
1387         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1388                exp, exp->exp_client_uuid.uuid);
1389
1390         if (obd_dump_on_timeout)
1391                 libcfs_debug_dumplog();
1392
1393         /* need for safe call CDEBUG after obd_disconnect */
1394         class_export_get(exp);
1395
1396         /* Most callers into obd_disconnect are removing their own reference
1397          * (request, for example) in addition to the one from the hash table.
1398          * We don't have such a reference here, so make one. */
1399         class_export_get(exp);
1400         rc = obd_disconnect(exp);
1401         if (rc)
1402                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1403         else
1404                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1405                        exp, exp->exp_client_uuid.uuid);
1406         class_export_put(exp);
1407 }
1408 EXPORT_SYMBOL(class_fail_export);
1409
1410 char *obd_export_nid2str(struct obd_export *exp)
1411 {
1412         if (exp->exp_connection != NULL)
1413                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1414
1415         return "(no nid)";
1416 }
1417 EXPORT_SYMBOL(obd_export_nid2str);
1418
1419 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1420 {
1421         cfs_hash_t *nid_hash;
1422         struct obd_export *doomed_exp = NULL;
1423         int exports_evicted = 0;
1424
1425         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1426
1427         spin_lock(&obd->obd_dev_lock);
1428         /* umount has run already, so evict thread should leave
1429          * its task to umount thread now */
1430         if (obd->obd_stopping) {
1431                 spin_unlock(&obd->obd_dev_lock);
1432                 return exports_evicted;
1433         }
1434         nid_hash = obd->obd_nid_hash;
1435         cfs_hash_getref(nid_hash);
1436         spin_unlock(&obd->obd_dev_lock);
1437
1438         do {
1439                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1440                 if (doomed_exp == NULL)
1441                         break;
1442
1443                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1444                          "nid %s found, wanted nid %s, requested nid %s\n",
1445                          obd_export_nid2str(doomed_exp),
1446                          libcfs_nid2str(nid_key), nid);
1447                 LASSERTF(doomed_exp != obd->obd_self_export,
1448                          "self-export is hashed by NID?\n");
1449                 exports_evicted++;
1450                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1451                               "request\n", obd->obd_name,
1452                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1453                               obd_export_nid2str(doomed_exp));
1454                 class_fail_export(doomed_exp);
1455                 class_export_put(doomed_exp);
1456         } while (1);
1457
1458         cfs_hash_putref(nid_hash);
1459
1460         if (!exports_evicted)
1461                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1462                        obd->obd_name, nid);
1463         return exports_evicted;
1464 }
1465 EXPORT_SYMBOL(obd_export_evict_by_nid);
1466
1467 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1468 {
1469         cfs_hash_t *uuid_hash;
1470         struct obd_export *doomed_exp = NULL;
1471         struct obd_uuid doomed_uuid;
1472         int exports_evicted = 0;
1473
1474         spin_lock(&obd->obd_dev_lock);
1475         if (obd->obd_stopping) {
1476                 spin_unlock(&obd->obd_dev_lock);
1477                 return exports_evicted;
1478         }
1479         uuid_hash = obd->obd_uuid_hash;
1480         cfs_hash_getref(uuid_hash);
1481         spin_unlock(&obd->obd_dev_lock);
1482
1483         obd_str2uuid(&doomed_uuid, uuid);
1484         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1485                 CERROR("%s: can't evict myself\n", obd->obd_name);
1486                 cfs_hash_putref(uuid_hash);
1487                 return exports_evicted;
1488         }
1489
1490         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1491
1492         if (doomed_exp == NULL) {
1493                 CERROR("%s: can't disconnect %s: no exports found\n",
1494                        obd->obd_name, uuid);
1495         } else {
1496                 CWARN("%s: evicting %s at adminstrative request\n",
1497                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1498                 class_fail_export(doomed_exp);
1499                 class_export_put(doomed_exp);
1500                 exports_evicted++;
1501         }
1502         cfs_hash_putref(uuid_hash);
1503
1504         return exports_evicted;
1505 }
1506 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1507
1508 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1509 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1510 EXPORT_SYMBOL(class_export_dump_hook);
1511 #endif
1512
1513 static void print_export_data(struct obd_export *exp, const char *status,
1514                               int locks)
1515 {
1516         struct ptlrpc_reply_state *rs;
1517         struct ptlrpc_reply_state *first_reply = NULL;
1518         int nreplies = 0;
1519
1520         spin_lock(&exp->exp_lock);
1521         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1522                                 rs_exp_list) {
1523                 if (nreplies == 0)
1524                         first_reply = rs;
1525                 nreplies++;
1526         }
1527         spin_unlock(&exp->exp_lock);
1528
1529         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1530                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1531                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1532                cfs_atomic_read(&exp->exp_rpc_count),
1533                cfs_atomic_read(&exp->exp_cb_count),
1534                cfs_atomic_read(&exp->exp_locks_count),
1535                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1536                nreplies, first_reply, nreplies > 3 ? "..." : "",
1537                exp->exp_last_committed);
1538 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1539         if (locks && class_export_dump_hook != NULL)
1540                 class_export_dump_hook(exp);
1541 #endif
1542 }
1543
1544 void dump_exports(struct obd_device *obd, int locks)
1545 {
1546         struct obd_export *exp;
1547
1548         spin_lock(&obd->obd_dev_lock);
1549         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1550                 print_export_data(exp, "ACTIVE", locks);
1551         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1552                 print_export_data(exp, "UNLINKED", locks);
1553         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1554                 print_export_data(exp, "DELAYED", locks);
1555         spin_unlock(&obd->obd_dev_lock);
1556         spin_lock(&obd_zombie_impexp_lock);
1557         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1558                 print_export_data(exp, "ZOMBIE", locks);
1559         spin_unlock(&obd_zombie_impexp_lock);
1560 }
1561 EXPORT_SYMBOL(dump_exports);
1562
1563 void obd_exports_barrier(struct obd_device *obd)
1564 {
1565         int waited = 2;
1566         LASSERT(cfs_list_empty(&obd->obd_exports));
1567         spin_lock(&obd->obd_dev_lock);
1568         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1569                 spin_unlock(&obd->obd_dev_lock);
1570                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1571                                                    cfs_time_seconds(waited));
1572                 if (waited > 5 && IS_PO2(waited)) {
1573                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1574                                       "more than %d seconds. "
1575                                       "The obd refcount = %d. Is it stuck?\n",
1576                                       obd->obd_name, waited,
1577                                       cfs_atomic_read(&obd->obd_refcount));
1578                         dump_exports(obd, 1);
1579                 }
1580                 waited *= 2;
1581                 spin_lock(&obd->obd_dev_lock);
1582         }
1583         spin_unlock(&obd->obd_dev_lock);
1584 }
1585 EXPORT_SYMBOL(obd_exports_barrier);
1586
1587 /* Total amount of zombies to be destroyed */
1588 static int zombies_count = 0;
1589
1590 /**
1591  * kill zombie imports and exports
1592  */
1593 void obd_zombie_impexp_cull(void)
1594 {
1595         struct obd_import *import;
1596         struct obd_export *export;
1597         ENTRY;
1598
1599         do {
1600                 spin_lock(&obd_zombie_impexp_lock);
1601
1602                 import = NULL;
1603                 if (!cfs_list_empty(&obd_zombie_imports)) {
1604                         import = cfs_list_entry(obd_zombie_imports.next,
1605                                                 struct obd_import,
1606                                                 imp_zombie_chain);
1607                         cfs_list_del_init(&import->imp_zombie_chain);
1608                 }
1609
1610                 export = NULL;
1611                 if (!cfs_list_empty(&obd_zombie_exports)) {
1612                         export = cfs_list_entry(obd_zombie_exports.next,
1613                                                 struct obd_export,
1614                                                 exp_obd_chain);
1615                         cfs_list_del_init(&export->exp_obd_chain);
1616                 }
1617
1618                 spin_unlock(&obd_zombie_impexp_lock);
1619
1620                 if (import != NULL) {
1621                         class_import_destroy(import);
1622                         spin_lock(&obd_zombie_impexp_lock);
1623                         zombies_count--;
1624                         spin_unlock(&obd_zombie_impexp_lock);
1625                 }
1626
1627                 if (export != NULL) {
1628                         class_export_destroy(export);
1629                         spin_lock(&obd_zombie_impexp_lock);
1630                         zombies_count--;
1631                         spin_unlock(&obd_zombie_impexp_lock);
1632                 }
1633
1634                 cond_resched();
1635         } while (import != NULL || export != NULL);
1636         EXIT;
1637 }
1638
1639 static struct completion        obd_zombie_start;
1640 static struct completion        obd_zombie_stop;
1641 static unsigned long            obd_zombie_flags;
1642 static wait_queue_head_t        obd_zombie_waitq;
1643 static pid_t                    obd_zombie_pid;
1644
1645 enum {
1646         OBD_ZOMBIE_STOP         = 0x0001,
1647 };
1648
1649 /**
1650  * check for work for kill zombie import/export thread.
1651  */
1652 static int obd_zombie_impexp_check(void *arg)
1653 {
1654         int rc;
1655
1656         spin_lock(&obd_zombie_impexp_lock);
1657         rc = (zombies_count == 0) &&
1658              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1659         spin_unlock(&obd_zombie_impexp_lock);
1660
1661         RETURN(rc);
1662 }
1663
1664 /**
1665  * Add export to the obd_zombe thread and notify it.
1666  */
1667 static void obd_zombie_export_add(struct obd_export *exp) {
1668         spin_lock(&exp->exp_obd->obd_dev_lock);
1669         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1670         cfs_list_del_init(&exp->exp_obd_chain);
1671         spin_unlock(&exp->exp_obd->obd_dev_lock);
1672         spin_lock(&obd_zombie_impexp_lock);
1673         zombies_count++;
1674         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1675         spin_unlock(&obd_zombie_impexp_lock);
1676
1677         obd_zombie_impexp_notify();
1678 }
1679
1680 /**
1681  * Add import to the obd_zombe thread and notify it.
1682  */
1683 static void obd_zombie_import_add(struct obd_import *imp) {
1684         LASSERT(imp->imp_sec == NULL);
1685         LASSERT(imp->imp_rq_pool == NULL);
1686         spin_lock(&obd_zombie_impexp_lock);
1687         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1688         zombies_count++;
1689         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1690         spin_unlock(&obd_zombie_impexp_lock);
1691
1692         obd_zombie_impexp_notify();
1693 }
1694
1695 /**
1696  * notify import/export destroy thread about new zombie.
1697  */
1698 static void obd_zombie_impexp_notify(void)
1699 {
1700         /*
1701          * Make sure obd_zomebie_impexp_thread get this notification.
1702          * It is possible this signal only get by obd_zombie_barrier, and
1703          * barrier gulps this notification and sleeps away and hangs ensues
1704          */
1705         wake_up_all(&obd_zombie_waitq);
1706 }
1707
1708 /**
1709  * check whether obd_zombie is idle
1710  */
1711 static int obd_zombie_is_idle(void)
1712 {
1713         int rc;
1714
1715         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1716         spin_lock(&obd_zombie_impexp_lock);
1717         rc = (zombies_count == 0);
1718         spin_unlock(&obd_zombie_impexp_lock);
1719         return rc;
1720 }
1721
1722 /**
1723  * wait when obd_zombie import/export queues become empty
1724  */
1725 void obd_zombie_barrier(void)
1726 {
1727         struct l_wait_info lwi = { 0 };
1728
1729         if (obd_zombie_pid == current_pid())
1730                 /* don't wait for myself */
1731                 return;
1732         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1733 }
1734 EXPORT_SYMBOL(obd_zombie_barrier);
1735
1736 #ifdef __KERNEL__
1737
1738 /**
1739  * destroy zombie export/import thread.
1740  */
1741 static int obd_zombie_impexp_thread(void *unused)
1742 {
1743         unshare_fs_struct();
1744         complete(&obd_zombie_start);
1745
1746         obd_zombie_pid = current_pid();
1747
1748         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1749                 struct l_wait_info lwi = { 0 };
1750
1751                 l_wait_event(obd_zombie_waitq,
1752                              !obd_zombie_impexp_check(NULL), &lwi);
1753                 obd_zombie_impexp_cull();
1754
1755                 /*
1756                  * Notify obd_zombie_barrier callers that queues
1757                  * may be empty.
1758                  */
1759                 wake_up(&obd_zombie_waitq);
1760         }
1761
1762         complete(&obd_zombie_stop);
1763
1764         RETURN(0);
1765 }
1766
1767 #else /* ! KERNEL */
1768
1769 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1770 static void *obd_zombie_impexp_work_cb;
1771 static void *obd_zombie_impexp_idle_cb;
1772
1773 int obd_zombie_impexp_kill(void *arg)
1774 {
1775         int rc = 0;
1776
1777         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1778                 obd_zombie_impexp_cull();
1779                 rc = 1;
1780         }
1781         cfs_atomic_dec(&zombie_recur);
1782         return rc;
1783 }
1784
1785 #endif
1786
1787 /**
1788  * start destroy zombie import/export thread
1789  */
1790 int obd_zombie_impexp_init(void)
1791 {
1792 #ifdef __KERNEL__
1793         struct task_struct *task;
1794 #endif
1795
1796         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1797         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1798         spin_lock_init(&obd_zombie_impexp_lock);
1799         init_completion(&obd_zombie_start);
1800         init_completion(&obd_zombie_stop);
1801         init_waitqueue_head(&obd_zombie_waitq);
1802         obd_zombie_pid = 0;
1803
1804 #ifdef __KERNEL__
1805         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1806         if (IS_ERR(task))
1807                 RETURN(PTR_ERR(task));
1808
1809         wait_for_completion(&obd_zombie_start);
1810 #else
1811
1812         obd_zombie_impexp_work_cb =
1813                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1814                                                  &obd_zombie_impexp_kill, NULL);
1815
1816         obd_zombie_impexp_idle_cb =
1817                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1818                                                  &obd_zombie_impexp_check, NULL);
1819 #endif
1820         RETURN(0);
1821 }
1822 /**
1823  * stop destroy zombie import/export thread
1824  */
1825 void obd_zombie_impexp_stop(void)
1826 {
1827         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1828         obd_zombie_impexp_notify();
1829 #ifdef __KERNEL__
1830         wait_for_completion(&obd_zombie_stop);
1831 #else
1832         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1833         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1834 #endif
1835 }
1836
1837 /***** Kernel-userspace comm helpers *******/
1838
1839 /* Get length of entire message, including header */
1840 int kuc_len(int payload_len)
1841 {
1842         return sizeof(struct kuc_hdr) + payload_len;
1843 }
1844 EXPORT_SYMBOL(kuc_len);
1845
1846 /* Get a pointer to kuc header, given a ptr to the payload
1847  * @param p Pointer to payload area
1848  * @returns Pointer to kuc header
1849  */
1850 struct kuc_hdr * kuc_ptr(void *p)
1851 {
1852         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1853         LASSERT(lh->kuc_magic == KUC_MAGIC);
1854         return lh;
1855 }
1856 EXPORT_SYMBOL(kuc_ptr);
1857
1858 /* Test if payload is part of kuc message
1859  * @param p Pointer to payload area
1860  * @returns boolean
1861  */
1862 int kuc_ispayload(void *p)
1863 {
1864         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1865
1866         if (kh->kuc_magic == KUC_MAGIC)
1867                 return 1;
1868         else
1869                 return 0;
1870 }
1871 EXPORT_SYMBOL(kuc_ispayload);
1872
1873 /* Alloc space for a message, and fill in header
1874  * @return Pointer to payload area
1875  */
1876 void *kuc_alloc(int payload_len, int transport, int type)
1877 {
1878         struct kuc_hdr *lh;
1879         int len = kuc_len(payload_len);
1880
1881         OBD_ALLOC(lh, len);
1882         if (lh == NULL)
1883                 return ERR_PTR(-ENOMEM);
1884
1885         lh->kuc_magic = KUC_MAGIC;
1886         lh->kuc_transport = transport;
1887         lh->kuc_msgtype = type;
1888         lh->kuc_msglen = len;
1889
1890         return (void *)(lh + 1);
1891 }
1892 EXPORT_SYMBOL(kuc_alloc);
1893
1894 /* Takes pointer to payload area */
1895 inline void kuc_free(void *p, int payload_len)
1896 {
1897         struct kuc_hdr *lh = kuc_ptr(p);
1898         OBD_FREE(lh, kuc_len(payload_len));
1899 }
1900 EXPORT_SYMBOL(kuc_free);
1901
1902
1903