Whamcloud - gitweb
LU-1346 libcfs: replace libcfs wrappers with kernel API
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (!cfs_request_module("%s", modname)) {
129                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
130                         type = class_search_type(name);
131                 } else {
132                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
133                                            modname);
134                 }
135         }
136 #endif
137         if (type) {
138                 spin_lock(&type->obd_type_lock);
139                 type->typ_refcnt++;
140                 cfs_try_module_get(type->typ_dt_ops->o_owner);
141                 spin_unlock(&type->obd_type_lock);
142         }
143         return type;
144 }
145 EXPORT_SYMBOL(class_get_type);
146
147 void class_put_type(struct obd_type *type)
148 {
149         LASSERT(type);
150         spin_lock(&type->obd_type_lock);
151         type->typ_refcnt--;
152         cfs_module_put(type->typ_dt_ops->o_owner);
153         spin_unlock(&type->obd_type_lock);
154 }
155 EXPORT_SYMBOL(class_put_type);
156
157 #define CLASS_MAX_NAME 1024
158
159 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
160                         struct lprocfs_vars *vars, const char *name,
161                         struct lu_device_type *ldt)
162 {
163         struct obd_type *type;
164         int rc = 0;
165         ENTRY;
166
167         /* sanity check */
168         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
169
170         if (class_search_type(name)) {
171                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
172                 RETURN(-EEXIST);
173         }
174
175         rc = -ENOMEM;
176         OBD_ALLOC(type, sizeof(*type));
177         if (type == NULL)
178                 RETURN(rc);
179
180         OBD_ALLOC_PTR(type->typ_dt_ops);
181         OBD_ALLOC_PTR(type->typ_md_ops);
182         OBD_ALLOC(type->typ_name, strlen(name) + 1);
183
184         if (type->typ_dt_ops == NULL ||
185             type->typ_md_ops == NULL ||
186             type->typ_name == NULL)
187                 GOTO (failed, rc);
188
189         *(type->typ_dt_ops) = *dt_ops;
190         /* md_ops is optional */
191         if (md_ops)
192                 *(type->typ_md_ops) = *md_ops;
193         strcpy(type->typ_name, name);
194         spin_lock_init(&type->obd_type_lock);
195
196 #ifdef LPROCFS
197         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
198                                               vars, type);
199         if (IS_ERR(type->typ_procroot)) {
200                 rc = PTR_ERR(type->typ_procroot);
201                 type->typ_procroot = NULL;
202                 GOTO (failed, rc);
203         }
204 #endif
205         if (ldt != NULL) {
206                 type->typ_lu = ldt;
207                 rc = lu_device_type_init(ldt);
208                 if (rc != 0)
209                         GOTO (failed, rc);
210         }
211
212         spin_lock(&obd_types_lock);
213         cfs_list_add(&type->typ_chain, &obd_types);
214         spin_unlock(&obd_types_lock);
215
216         RETURN (0);
217
218  failed:
219         if (type->typ_name != NULL)
220                 OBD_FREE(type->typ_name, strlen(name) + 1);
221         if (type->typ_md_ops != NULL)
222                 OBD_FREE_PTR(type->typ_md_ops);
223         if (type->typ_dt_ops != NULL)
224                 OBD_FREE_PTR(type->typ_dt_ops);
225         OBD_FREE(type, sizeof(*type));
226         RETURN(rc);
227 }
228 EXPORT_SYMBOL(class_register_type);
229
230 int class_unregister_type(const char *name)
231 {
232         struct obd_type *type = class_search_type(name);
233         ENTRY;
234
235         if (!type) {
236                 CERROR("unknown obd type\n");
237                 RETURN(-EINVAL);
238         }
239
240         if (type->typ_refcnt) {
241                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
242                 /* This is a bad situation, let's make the best of it */
243                 /* Remove ops, but leave the name for debugging */
244                 OBD_FREE_PTR(type->typ_dt_ops);
245                 OBD_FREE_PTR(type->typ_md_ops);
246                 RETURN(-EBUSY);
247         }
248
249         /* we do not use type->typ_procroot as for compatibility purposes
250          * other modules can share names (i.e. lod can use lov entry). so
251          * we can't reference pointer as it can get invalided when another
252          * module removes the entry */
253         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
254
255         if (type->typ_lu)
256                 lu_device_type_fini(type->typ_lu);
257
258         spin_lock(&obd_types_lock);
259         cfs_list_del(&type->typ_chain);
260         spin_unlock(&obd_types_lock);
261         OBD_FREE(type->typ_name, strlen(name) + 1);
262         if (type->typ_dt_ops != NULL)
263                 OBD_FREE_PTR(type->typ_dt_ops);
264         if (type->typ_md_ops != NULL)
265                 OBD_FREE_PTR(type->typ_md_ops);
266         OBD_FREE(type, sizeof(*type));
267         RETURN(0);
268 } /* class_unregister_type */
269 EXPORT_SYMBOL(class_unregister_type);
270
271 /**
272  * Create a new obd device.
273  *
274  * Find an empty slot in ::obd_devs[], create a new obd device in it.
275  *
276  * \param[in] type_name obd device type string.
277  * \param[in] name      obd device name.
278  *
279  * \retval NULL if create fails, otherwise return the obd device
280  *         pointer created.
281  */
282 struct obd_device *class_newdev(const char *type_name, const char *name)
283 {
284         struct obd_device *result = NULL;
285         struct obd_device *newdev;
286         struct obd_type *type = NULL;
287         int i;
288         int new_obd_minor = 0;
289         ENTRY;
290
291         if (strlen(name) >= MAX_OBD_NAME) {
292                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
293                 RETURN(ERR_PTR(-EINVAL));
294         }
295
296         type = class_get_type(type_name);
297         if (type == NULL){
298                 CERROR("OBD: unknown type: %s\n", type_name);
299                 RETURN(ERR_PTR(-ENODEV));
300         }
301
302         newdev = obd_device_alloc();
303         if (newdev == NULL)
304                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
305
306         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
307
308         write_lock(&obd_dev_lock);
309         for (i = 0; i < class_devno_max(); i++) {
310                 struct obd_device *obd = class_num2obd(i);
311
312                 if (obd && obd->obd_name &&
313                     (strcmp(name, obd->obd_name) == 0)) {
314                         CERROR("Device %s already exists at %d, won't add\n",
315                                name, i);
316                         if (result) {
317                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
318                                          "%p obd_magic %08x != %08x\n", result,
319                                          result->obd_magic, OBD_DEVICE_MAGIC);
320                                 LASSERTF(result->obd_minor == new_obd_minor,
321                                          "%p obd_minor %d != %d\n", result,
322                                          result->obd_minor, new_obd_minor);
323
324                                 obd_devs[result->obd_minor] = NULL;
325                                 result->obd_name[0]='\0';
326                          }
327                         result = ERR_PTR(-EEXIST);
328                         break;
329                 }
330                 if (!result && !obd) {
331                         result = newdev;
332                         result->obd_minor = i;
333                         new_obd_minor = i;
334                         result->obd_type = type;
335                         strncpy(result->obd_name, name,
336                                 sizeof(result->obd_name) - 1);
337                         obd_devs[i] = result;
338                 }
339         }
340         write_unlock(&obd_dev_lock);
341
342         if (result == NULL && i >= class_devno_max()) {
343                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
344                        class_devno_max());
345                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
346         }
347
348         if (IS_ERR(result))
349                 GOTO(out, result);
350
351         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
352                result->obd_name, result);
353
354         RETURN(result);
355 out:
356         obd_device_free(newdev);
357 out_type:
358         class_put_type(type);
359         return result;
360 }
361
362 void class_release_dev(struct obd_device *obd)
363 {
364         struct obd_type *obd_type = obd->obd_type;
365
366         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
367                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
368         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
369                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
370         LASSERT(obd_type != NULL);
371
372         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
373                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
374
375         write_lock(&obd_dev_lock);
376         obd_devs[obd->obd_minor] = NULL;
377         write_unlock(&obd_dev_lock);
378         obd_device_free(obd);
379
380         class_put_type(obd_type);
381 }
382
383 int class_name2dev(const char *name)
384 {
385         int i;
386
387         if (!name)
388                 return -1;
389
390         read_lock(&obd_dev_lock);
391         for (i = 0; i < class_devno_max(); i++) {
392                 struct obd_device *obd = class_num2obd(i);
393
394                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
395                         /* Make sure we finished attaching before we give
396                            out any references */
397                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
398                         if (obd->obd_attached) {
399                                 read_unlock(&obd_dev_lock);
400                                 return i;
401                         }
402                         break;
403                 }
404         }
405         read_unlock(&obd_dev_lock);
406
407         return -1;
408 }
409 EXPORT_SYMBOL(class_name2dev);
410
411 struct obd_device *class_name2obd(const char *name)
412 {
413         int dev = class_name2dev(name);
414
415         if (dev < 0 || dev > class_devno_max())
416                 return NULL;
417         return class_num2obd(dev);
418 }
419 EXPORT_SYMBOL(class_name2obd);
420
421 int class_uuid2dev(struct obd_uuid *uuid)
422 {
423         int i;
424
425         read_lock(&obd_dev_lock);
426         for (i = 0; i < class_devno_max(); i++) {
427                 struct obd_device *obd = class_num2obd(i);
428
429                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
430                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
431                         read_unlock(&obd_dev_lock);
432                         return i;
433                 }
434         }
435         read_unlock(&obd_dev_lock);
436
437         return -1;
438 }
439 EXPORT_SYMBOL(class_uuid2dev);
440
441 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
442 {
443         int dev = class_uuid2dev(uuid);
444         if (dev < 0)
445                 return NULL;
446         return class_num2obd(dev);
447 }
448 EXPORT_SYMBOL(class_uuid2obd);
449
450 /**
451  * Get obd device from ::obd_devs[]
452  *
453  * \param num [in] array index
454  *
455  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
456  *         otherwise return the obd device there.
457  */
458 struct obd_device *class_num2obd(int num)
459 {
460         struct obd_device *obd = NULL;
461
462         if (num < class_devno_max()) {
463                 obd = obd_devs[num];
464                 if (obd == NULL)
465                         return NULL;
466
467                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
468                          "%p obd_magic %08x != %08x\n",
469                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
470                 LASSERTF(obd->obd_minor == num,
471                          "%p obd_minor %0d != %0d\n",
472                          obd, obd->obd_minor, num);
473         }
474
475         return obd;
476 }
477 EXPORT_SYMBOL(class_num2obd);
478
479 void class_obd_list(void)
480 {
481         char *status;
482         int i;
483
484         read_lock(&obd_dev_lock);
485         for (i = 0; i < class_devno_max(); i++) {
486                 struct obd_device *obd = class_num2obd(i);
487
488                 if (obd == NULL)
489                         continue;
490                 if (obd->obd_stopping)
491                         status = "ST";
492                 else if (obd->obd_set_up)
493                         status = "UP";
494                 else if (obd->obd_attached)
495                         status = "AT";
496                 else
497                         status = "--";
498                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
499                          i, status, obd->obd_type->typ_name,
500                          obd->obd_name, obd->obd_uuid.uuid,
501                          cfs_atomic_read(&obd->obd_refcount));
502         }
503         read_unlock(&obd_dev_lock);
504         return;
505 }
506
507 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
508    specified, then only the client with that uuid is returned,
509    otherwise any client connected to the tgt is returned. */
510 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
511                                           const char * typ_name,
512                                           struct obd_uuid *grp_uuid)
513 {
514         int i;
515
516         read_lock(&obd_dev_lock);
517         for (i = 0; i < class_devno_max(); i++) {
518                 struct obd_device *obd = class_num2obd(i);
519
520                 if (obd == NULL)
521                         continue;
522                 if ((strncmp(obd->obd_type->typ_name, typ_name,
523                              strlen(typ_name)) == 0)) {
524                         if (obd_uuid_equals(tgt_uuid,
525                                             &obd->u.cli.cl_target_uuid) &&
526                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
527                                                          &obd->obd_uuid) : 1)) {
528                                 read_unlock(&obd_dev_lock);
529                                 return obd;
530                         }
531                 }
532         }
533         read_unlock(&obd_dev_lock);
534
535         return NULL;
536 }
537 EXPORT_SYMBOL(class_find_client_obd);
538
539 /* Iterate the obd_device list looking devices have grp_uuid. Start
540    searching at *next, and if a device is found, the next index to look
541    at is saved in *next. If next is NULL, then the first matching device
542    will always be returned. */
543 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
544 {
545         int i;
546
547         if (next == NULL)
548                 i = 0;
549         else if (*next >= 0 && *next < class_devno_max())
550                 i = *next;
551         else
552                 return NULL;
553
554         read_lock(&obd_dev_lock);
555         for (; i < class_devno_max(); i++) {
556                 struct obd_device *obd = class_num2obd(i);
557
558                 if (obd == NULL)
559                         continue;
560                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
561                         if (next != NULL)
562                                 *next = i+1;
563                         read_unlock(&obd_dev_lock);
564                         return obd;
565                 }
566         }
567         read_unlock(&obd_dev_lock);
568
569         return NULL;
570 }
571 EXPORT_SYMBOL(class_devices_in_group);
572
573 /**
574  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
575  * adjust sptlrpc settings accordingly.
576  */
577 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
578 {
579         struct obd_device  *obd;
580         const char         *type;
581         int                 i, rc = 0, rc2;
582
583         LASSERT(namelen > 0);
584
585         read_lock(&obd_dev_lock);
586         for (i = 0; i < class_devno_max(); i++) {
587                 obd = class_num2obd(i);
588
589                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
590                         continue;
591
592                 /* only notify mdc, osc, mdt, ost */
593                 type = obd->obd_type->typ_name;
594                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
595                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
596                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
597                     strcmp(type, LUSTRE_OST_NAME) != 0)
598                         continue;
599
600                 if (strncmp(obd->obd_name, fsname, namelen))
601                         continue;
602
603                 class_incref(obd, __FUNCTION__, obd);
604                 read_unlock(&obd_dev_lock);
605                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
606                                          sizeof(KEY_SPTLRPC_CONF),
607                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
608                 rc = rc ? rc : rc2;
609                 class_decref(obd, __FUNCTION__, obd);
610                 read_lock(&obd_dev_lock);
611         }
612         read_unlock(&obd_dev_lock);
613         return rc;
614 }
615 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
616
617 void obd_cleanup_caches(void)
618 {
619         int rc;
620
621         ENTRY;
622         if (obd_device_cachep) {
623                 rc = cfs_mem_cache_destroy(obd_device_cachep);
624                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
625                 obd_device_cachep = NULL;
626         }
627         if (obdo_cachep) {
628                 rc = cfs_mem_cache_destroy(obdo_cachep);
629                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
630                 obdo_cachep = NULL;
631         }
632         if (import_cachep) {
633                 rc = cfs_mem_cache_destroy(import_cachep);
634                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
635                 import_cachep = NULL;
636         }
637         if (capa_cachep) {
638                 rc = cfs_mem_cache_destroy(capa_cachep);
639                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
640                 capa_cachep = NULL;
641         }
642         EXIT;
643 }
644
645 int obd_init_caches(void)
646 {
647         ENTRY;
648
649         LASSERT(obd_device_cachep == NULL);
650         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
651                                                  sizeof(struct obd_device),
652                                                  0, 0);
653         if (!obd_device_cachep)
654                 GOTO(out, -ENOMEM);
655
656         LASSERT(obdo_cachep == NULL);
657         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
658                                            0, 0);
659         if (!obdo_cachep)
660                 GOTO(out, -ENOMEM);
661
662         LASSERT(import_cachep == NULL);
663         import_cachep = cfs_mem_cache_create("ll_import_cache",
664                                              sizeof(struct obd_import),
665                                              0, 0);
666         if (!import_cachep)
667                 GOTO(out, -ENOMEM);
668
669         LASSERT(capa_cachep == NULL);
670         capa_cachep = cfs_mem_cache_create("capa_cache",
671                                            sizeof(struct obd_capa), 0, 0);
672         if (!capa_cachep)
673                 GOTO(out, -ENOMEM);
674
675         RETURN(0);
676  out:
677         obd_cleanup_caches();
678         RETURN(-ENOMEM);
679
680 }
681
682 /* map connection to client */
683 struct obd_export *class_conn2export(struct lustre_handle *conn)
684 {
685         struct obd_export *export;
686         ENTRY;
687
688         if (!conn) {
689                 CDEBUG(D_CACHE, "looking for null handle\n");
690                 RETURN(NULL);
691         }
692
693         if (conn->cookie == -1) {  /* this means assign a new connection */
694                 CDEBUG(D_CACHE, "want a new connection\n");
695                 RETURN(NULL);
696         }
697
698         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
699         export = class_handle2object(conn->cookie);
700         RETURN(export);
701 }
702 EXPORT_SYMBOL(class_conn2export);
703
704 struct obd_device *class_exp2obd(struct obd_export *exp)
705 {
706         if (exp)
707                 return exp->exp_obd;
708         return NULL;
709 }
710 EXPORT_SYMBOL(class_exp2obd);
711
712 struct obd_device *class_conn2obd(struct lustre_handle *conn)
713 {
714         struct obd_export *export;
715         export = class_conn2export(conn);
716         if (export) {
717                 struct obd_device *obd = export->exp_obd;
718                 class_export_put(export);
719                 return obd;
720         }
721         return NULL;
722 }
723 EXPORT_SYMBOL(class_conn2obd);
724
725 struct obd_import *class_exp2cliimp(struct obd_export *exp)
726 {
727         struct obd_device *obd = exp->exp_obd;
728         if (obd == NULL)
729                 return NULL;
730         return obd->u.cli.cl_import;
731 }
732 EXPORT_SYMBOL(class_exp2cliimp);
733
734 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
735 {
736         struct obd_device *obd = class_conn2obd(conn);
737         if (obd == NULL)
738                 return NULL;
739         return obd->u.cli.cl_import;
740 }
741 EXPORT_SYMBOL(class_conn2cliimp);
742
743 /* Export management functions */
744 static void class_export_destroy(struct obd_export *exp)
745 {
746         struct obd_device *obd = exp->exp_obd;
747         ENTRY;
748
749         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
750         LASSERT(obd != NULL);
751
752         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
753                exp->exp_client_uuid.uuid, obd->obd_name);
754
755         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
756         if (exp->exp_connection)
757                 ptlrpc_put_connection_superhack(exp->exp_connection);
758
759         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
760         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
761         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
762         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
763         obd_destroy_export(exp);
764         class_decref(obd, "export", exp);
765
766         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
767         EXIT;
768 }
769
770 static void export_handle_addref(void *export)
771 {
772         class_export_get(export);
773 }
774
775 static struct portals_handle_ops export_handle_ops = {
776         .hop_addref = export_handle_addref,
777         .hop_free   = NULL,
778 };
779
780 struct obd_export *class_export_get(struct obd_export *exp)
781 {
782         cfs_atomic_inc(&exp->exp_refcount);
783         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
784                cfs_atomic_read(&exp->exp_refcount));
785         return exp;
786 }
787 EXPORT_SYMBOL(class_export_get);
788
789 void class_export_put(struct obd_export *exp)
790 {
791         LASSERT(exp != NULL);
792         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
793         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
794                cfs_atomic_read(&exp->exp_refcount) - 1);
795
796         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
797                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
798                 CDEBUG(D_IOCTL, "final put %p/%s\n",
799                        exp, exp->exp_client_uuid.uuid);
800
801                 /* release nid stat refererence */
802                 lprocfs_exp_cleanup(exp);
803
804                 obd_zombie_export_add(exp);
805         }
806 }
807 EXPORT_SYMBOL(class_export_put);
808
809 /* Creates a new export, adds it to the hash table, and returns a
810  * pointer to it. The refcount is 2: one for the hash reference, and
811  * one for the pointer returned by this function. */
812 struct obd_export *class_new_export(struct obd_device *obd,
813                                     struct obd_uuid *cluuid)
814 {
815         struct obd_export *export;
816         cfs_hash_t *hash = NULL;
817         int rc = 0;
818         ENTRY;
819
820         OBD_ALLOC_PTR(export);
821         if (!export)
822                 return ERR_PTR(-ENOMEM);
823
824         export->exp_conn_cnt = 0;
825         export->exp_lock_hash = NULL;
826         export->exp_flock_hash = NULL;
827         cfs_atomic_set(&export->exp_refcount, 2);
828         cfs_atomic_set(&export->exp_rpc_count, 0);
829         cfs_atomic_set(&export->exp_cb_count, 0);
830         cfs_atomic_set(&export->exp_locks_count, 0);
831 #if LUSTRE_TRACKS_LOCK_EXP_REFS
832         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
833         spin_lock_init(&export->exp_locks_list_guard);
834 #endif
835         cfs_atomic_set(&export->exp_replay_count, 0);
836         export->exp_obd = obd;
837         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
838         spin_lock_init(&export->exp_uncommitted_replies_lock);
839         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
840         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
841         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
842         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
843         class_handle_hash(&export->exp_handle, &export_handle_ops);
844         export->exp_last_request_time = cfs_time_current_sec();
845         spin_lock_init(&export->exp_lock);
846         spin_lock_init(&export->exp_rpc_lock);
847         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
848         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
849         spin_lock_init(&export->exp_bl_list_lock);
850         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
851
852         export->exp_sp_peer = LUSTRE_SP_ANY;
853         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
854         export->exp_client_uuid = *cluuid;
855         obd_init_export(export);
856
857         spin_lock(&obd->obd_dev_lock);
858         /* shouldn't happen, but might race */
859         if (obd->obd_stopping)
860                 GOTO(exit_unlock, rc = -ENODEV);
861
862         hash = cfs_hash_getref(obd->obd_uuid_hash);
863         if (hash == NULL)
864                 GOTO(exit_unlock, rc = -ENODEV);
865         spin_unlock(&obd->obd_dev_lock);
866
867         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
868                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
869                 if (rc != 0) {
870                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
871                                       obd->obd_name, cluuid->uuid, rc);
872                         GOTO(exit_err, rc = -EALREADY);
873                 }
874         }
875
876         spin_lock(&obd->obd_dev_lock);
877         if (obd->obd_stopping) {
878                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
879                 GOTO(exit_unlock, rc = -ENODEV);
880         }
881
882         class_incref(obd, "export", export);
883         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
884         cfs_list_add_tail(&export->exp_obd_chain_timed,
885                           &export->exp_obd->obd_exports_timed);
886         export->exp_obd->obd_num_exports++;
887         spin_unlock(&obd->obd_dev_lock);
888         cfs_hash_putref(hash);
889         RETURN(export);
890
891 exit_unlock:
892         spin_unlock(&obd->obd_dev_lock);
893 exit_err:
894         if (hash)
895                 cfs_hash_putref(hash);
896         class_handle_unhash(&export->exp_handle);
897         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
898         obd_destroy_export(export);
899         OBD_FREE_PTR(export);
900         return ERR_PTR(rc);
901 }
902 EXPORT_SYMBOL(class_new_export);
903
904 void class_unlink_export(struct obd_export *exp)
905 {
906         class_handle_unhash(&exp->exp_handle);
907
908         spin_lock(&exp->exp_obd->obd_dev_lock);
909         /* delete an uuid-export hashitem from hashtables */
910         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
911                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
912                              &exp->exp_client_uuid,
913                              &exp->exp_uuid_hash);
914
915         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
916         cfs_list_del_init(&exp->exp_obd_chain_timed);
917         exp->exp_obd->obd_num_exports--;
918         spin_unlock(&exp->exp_obd->obd_dev_lock);
919         class_export_put(exp);
920 }
921 EXPORT_SYMBOL(class_unlink_export);
922
923 /* Import management functions */
924 void class_import_destroy(struct obd_import *imp)
925 {
926         ENTRY;
927
928         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
929                 imp->imp_obd->obd_name);
930
931         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
932
933         ptlrpc_put_connection_superhack(imp->imp_connection);
934
935         while (!cfs_list_empty(&imp->imp_conn_list)) {
936                 struct obd_import_conn *imp_conn;
937
938                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
939                                           struct obd_import_conn, oic_item);
940                 cfs_list_del_init(&imp_conn->oic_item);
941                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
942                 OBD_FREE(imp_conn, sizeof(*imp_conn));
943         }
944
945         LASSERT(imp->imp_sec == NULL);
946         class_decref(imp->imp_obd, "import", imp);
947         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
948         EXIT;
949 }
950
951 static void import_handle_addref(void *import)
952 {
953         class_import_get(import);
954 }
955
956 static struct portals_handle_ops import_handle_ops = {
957         .hop_addref = import_handle_addref,
958         .hop_free   = NULL,
959 };
960
961 struct obd_import *class_import_get(struct obd_import *import)
962 {
963         cfs_atomic_inc(&import->imp_refcount);
964         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
965                cfs_atomic_read(&import->imp_refcount),
966                import->imp_obd->obd_name);
967         return import;
968 }
969 EXPORT_SYMBOL(class_import_get);
970
971 void class_import_put(struct obd_import *imp)
972 {
973         ENTRY;
974
975         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
976         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
977
978         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
979                cfs_atomic_read(&imp->imp_refcount) - 1,
980                imp->imp_obd->obd_name);
981
982         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
983                 CDEBUG(D_INFO, "final put import %p\n", imp);
984                 obd_zombie_import_add(imp);
985         }
986
987         /* catch possible import put race */
988         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
989         EXIT;
990 }
991 EXPORT_SYMBOL(class_import_put);
992
993 static void init_imp_at(struct imp_at *at) {
994         int i;
995         at_init(&at->iat_net_latency, 0, 0);
996         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
997                 /* max service estimates are tracked on the server side, so
998                    don't use the AT history here, just use the last reported
999                    val. (But keep hist for proc histogram, worst_ever) */
1000                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1001                         AT_FLG_NOHIST);
1002         }
1003 }
1004
1005 struct obd_import *class_new_import(struct obd_device *obd)
1006 {
1007         struct obd_import *imp;
1008
1009         OBD_ALLOC(imp, sizeof(*imp));
1010         if (imp == NULL)
1011                 return NULL;
1012
1013         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1014         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1015         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1016         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1017         spin_lock_init(&imp->imp_lock);
1018         imp->imp_last_success_conn = 0;
1019         imp->imp_state = LUSTRE_IMP_NEW;
1020         imp->imp_obd = class_incref(obd, "import", imp);
1021         mutex_init(&imp->imp_sec_mutex);
1022         cfs_waitq_init(&imp->imp_recovery_waitq);
1023
1024         cfs_atomic_set(&imp->imp_refcount, 2);
1025         cfs_atomic_set(&imp->imp_unregistering, 0);
1026         cfs_atomic_set(&imp->imp_inflight, 0);
1027         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1028         cfs_atomic_set(&imp->imp_inval_count, 0);
1029         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1030         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1031         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1032         init_imp_at(&imp->imp_at);
1033
1034         /* the default magic is V2, will be used in connect RPC, and
1035          * then adjusted according to the flags in request/reply. */
1036         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1037
1038         return imp;
1039 }
1040 EXPORT_SYMBOL(class_new_import);
1041
1042 void class_destroy_import(struct obd_import *import)
1043 {
1044         LASSERT(import != NULL);
1045         LASSERT(import != LP_POISON);
1046
1047         class_handle_unhash(&import->imp_handle);
1048
1049         spin_lock(&import->imp_lock);
1050         import->imp_generation++;
1051         spin_unlock(&import->imp_lock);
1052         class_import_put(import);
1053 }
1054 EXPORT_SYMBOL(class_destroy_import);
1055
1056 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1057
1058 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1059 {
1060         spin_lock(&exp->exp_locks_list_guard);
1061
1062         LASSERT(lock->l_exp_refs_nr >= 0);
1063
1064         if (lock->l_exp_refs_target != NULL &&
1065             lock->l_exp_refs_target != exp) {
1066                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1067                               exp, lock, lock->l_exp_refs_target);
1068         }
1069         if ((lock->l_exp_refs_nr ++) == 0) {
1070                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1071                 lock->l_exp_refs_target = exp;
1072         }
1073         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1074                lock, exp, lock->l_exp_refs_nr);
1075         spin_unlock(&exp->exp_locks_list_guard);
1076 }
1077 EXPORT_SYMBOL(__class_export_add_lock_ref);
1078
1079 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1080 {
1081         spin_lock(&exp->exp_locks_list_guard);
1082         LASSERT(lock->l_exp_refs_nr > 0);
1083         if (lock->l_exp_refs_target != exp) {
1084                 LCONSOLE_WARN("lock %p, "
1085                               "mismatching export pointers: %p, %p\n",
1086                               lock, lock->l_exp_refs_target, exp);
1087         }
1088         if (-- lock->l_exp_refs_nr == 0) {
1089                 cfs_list_del_init(&lock->l_exp_refs_link);
1090                 lock->l_exp_refs_target = NULL;
1091         }
1092         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1093                lock, exp, lock->l_exp_refs_nr);
1094         spin_unlock(&exp->exp_locks_list_guard);
1095 }
1096 EXPORT_SYMBOL(__class_export_del_lock_ref);
1097 #endif
1098
1099 /* A connection defines an export context in which preallocation can
1100    be managed. This releases the export pointer reference, and returns
1101    the export handle, so the export refcount is 1 when this function
1102    returns. */
1103 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1104                   struct obd_uuid *cluuid)
1105 {
1106         struct obd_export *export;
1107         LASSERT(conn != NULL);
1108         LASSERT(obd != NULL);
1109         LASSERT(cluuid != NULL);
1110         ENTRY;
1111
1112         export = class_new_export(obd, cluuid);
1113         if (IS_ERR(export))
1114                 RETURN(PTR_ERR(export));
1115
1116         conn->cookie = export->exp_handle.h_cookie;
1117         class_export_put(export);
1118
1119         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1120                cluuid->uuid, conn->cookie);
1121         RETURN(0);
1122 }
1123 EXPORT_SYMBOL(class_connect);
1124
1125 /* if export is involved in recovery then clean up related things */
1126 void class_export_recovery_cleanup(struct obd_export *exp)
1127 {
1128         struct obd_device *obd = exp->exp_obd;
1129
1130         spin_lock(&obd->obd_recovery_task_lock);
1131         if (exp->exp_delayed)
1132                 obd->obd_delayed_clients--;
1133         if (obd->obd_recovering && exp->exp_in_recovery) {
1134                 spin_lock(&exp->exp_lock);
1135                 exp->exp_in_recovery = 0;
1136                 spin_unlock(&exp->exp_lock);
1137                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1138                 cfs_atomic_dec(&obd->obd_connected_clients);
1139         }
1140         spin_unlock(&obd->obd_recovery_task_lock);
1141         /** Cleanup req replay fields */
1142         if (exp->exp_req_replay_needed) {
1143                 spin_lock(&exp->exp_lock);
1144                 exp->exp_req_replay_needed = 0;
1145                 spin_unlock(&exp->exp_lock);
1146                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1147                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1148         }
1149         /** Cleanup lock replay data */
1150         if (exp->exp_lock_replay_needed) {
1151                 spin_lock(&exp->exp_lock);
1152                 exp->exp_lock_replay_needed = 0;
1153                 spin_unlock(&exp->exp_lock);
1154                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1155                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1156         }
1157 }
1158
1159 /* This function removes 1-3 references from the export:
1160  * 1 - for export pointer passed
1161  * and if disconnect really need
1162  * 2 - removing from hash
1163  * 3 - in client_unlink_export
1164  * The export pointer passed to this function can destroyed */
1165 int class_disconnect(struct obd_export *export)
1166 {
1167         int already_disconnected;
1168         ENTRY;
1169
1170         if (export == NULL) {
1171                 CWARN("attempting to free NULL export %p\n", export);
1172                 RETURN(-EINVAL);
1173         }
1174
1175         spin_lock(&export->exp_lock);
1176         already_disconnected = export->exp_disconnected;
1177         export->exp_disconnected = 1;
1178         spin_unlock(&export->exp_lock);
1179
1180         /* class_cleanup(), abort_recovery(), and class_fail_export()
1181          * all end up in here, and if any of them race we shouldn't
1182          * call extra class_export_puts(). */
1183         if (already_disconnected) {
1184                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1185                 GOTO(no_disconn, already_disconnected);
1186         }
1187
1188         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1189                export->exp_handle.h_cookie);
1190
1191         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1192                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1193                              &export->exp_connection->c_peer.nid,
1194                              &export->exp_nid_hash);
1195
1196         class_export_recovery_cleanup(export);
1197         class_unlink_export(export);
1198 no_disconn:
1199         class_export_put(export);
1200         RETURN(0);
1201 }
1202 EXPORT_SYMBOL(class_disconnect);
1203
1204 /* Return non-zero for a fully connected export */
1205 int class_connected_export(struct obd_export *exp)
1206 {
1207         if (exp) {
1208                 int connected;
1209                 spin_lock(&exp->exp_lock);
1210                 connected = (exp->exp_conn_cnt > 0);
1211                 spin_unlock(&exp->exp_lock);
1212                 return connected;
1213         }
1214         return 0;
1215 }
1216 EXPORT_SYMBOL(class_connected_export);
1217
1218 static void class_disconnect_export_list(cfs_list_t *list,
1219                                          enum obd_option flags)
1220 {
1221         int rc;
1222         struct obd_export *exp;
1223         ENTRY;
1224
1225         /* It's possible that an export may disconnect itself, but
1226          * nothing else will be added to this list. */
1227         while (!cfs_list_empty(list)) {
1228                 exp = cfs_list_entry(list->next, struct obd_export,
1229                                      exp_obd_chain);
1230                 /* need for safe call CDEBUG after obd_disconnect */
1231                 class_export_get(exp);
1232
1233                 spin_lock(&exp->exp_lock);
1234                 exp->exp_flags = flags;
1235                 spin_unlock(&exp->exp_lock);
1236
1237                 if (obd_uuid_equals(&exp->exp_client_uuid,
1238                                     &exp->exp_obd->obd_uuid)) {
1239                         CDEBUG(D_HA,
1240                                "exp %p export uuid == obd uuid, don't discon\n",
1241                                exp);
1242                         /* Need to delete this now so we don't end up pointing
1243                          * to work_list later when this export is cleaned up. */
1244                         cfs_list_del_init(&exp->exp_obd_chain);
1245                         class_export_put(exp);
1246                         continue;
1247                 }
1248
1249                 class_export_get(exp);
1250                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1251                        "last request at "CFS_TIME_T"\n",
1252                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1253                        exp, exp->exp_last_request_time);
1254                 /* release one export reference anyway */
1255                 rc = obd_disconnect(exp);
1256
1257                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1258                        obd_export_nid2str(exp), exp, rc);
1259                 class_export_put(exp);
1260         }
1261         EXIT;
1262 }
1263
1264 void class_disconnect_exports(struct obd_device *obd)
1265 {
1266         cfs_list_t work_list;
1267         ENTRY;
1268
1269         /* Move all of the exports from obd_exports to a work list, en masse. */
1270         CFS_INIT_LIST_HEAD(&work_list);
1271         spin_lock(&obd->obd_dev_lock);
1272         cfs_list_splice_init(&obd->obd_exports, &work_list);
1273         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1274         spin_unlock(&obd->obd_dev_lock);
1275
1276         if (!cfs_list_empty(&work_list)) {
1277                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1278                        "disconnecting them\n", obd->obd_minor, obd);
1279                 class_disconnect_export_list(&work_list,
1280                                              exp_flags_from_obd(obd));
1281         } else
1282                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1283                        obd->obd_minor, obd);
1284         EXIT;
1285 }
1286 EXPORT_SYMBOL(class_disconnect_exports);
1287
1288 /* Remove exports that have not completed recovery.
1289  */
1290 void class_disconnect_stale_exports(struct obd_device *obd,
1291                                     int (*test_export)(struct obd_export *))
1292 {
1293         cfs_list_t work_list;
1294         struct obd_export *exp, *n;
1295         int evicted = 0;
1296         ENTRY;
1297
1298         CFS_INIT_LIST_HEAD(&work_list);
1299         spin_lock(&obd->obd_dev_lock);
1300         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1301                                      exp_obd_chain) {
1302                 /* don't count self-export as client */
1303                 if (obd_uuid_equals(&exp->exp_client_uuid,
1304                                     &exp->exp_obd->obd_uuid))
1305                         continue;
1306
1307                 /* don't evict clients which have no slot in last_rcvd
1308                  * (e.g. lightweight connection) */
1309                 if (exp->exp_target_data.ted_lr_idx == -1)
1310                         continue;
1311
1312                 spin_lock(&exp->exp_lock);
1313                 if (test_export(exp)) {
1314                         spin_unlock(&exp->exp_lock);
1315                         continue;
1316                 }
1317                 exp->exp_failed = 1;
1318                 spin_unlock(&exp->exp_lock);
1319
1320                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1321                 evicted++;
1322                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1323                        obd->obd_name, exp->exp_client_uuid.uuid,
1324                        exp->exp_connection == NULL ? "<unknown>" :
1325                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1326                 print_export_data(exp, "EVICTING", 0);
1327         }
1328         spin_unlock(&obd->obd_dev_lock);
1329
1330         if (evicted) {
1331                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1332                               obd->obd_name, evicted);
1333                 obd->obd_stale_clients += evicted;
1334         }
1335         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1336                                                  OBD_OPT_ABORT_RECOV);
1337         EXIT;
1338 }
1339 EXPORT_SYMBOL(class_disconnect_stale_exports);
1340
1341 void class_fail_export(struct obd_export *exp)
1342 {
1343         int rc, already_failed;
1344
1345         spin_lock(&exp->exp_lock);
1346         already_failed = exp->exp_failed;
1347         exp->exp_failed = 1;
1348         spin_unlock(&exp->exp_lock);
1349
1350         if (already_failed) {
1351                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1352                        exp, exp->exp_client_uuid.uuid);
1353                 return;
1354         }
1355
1356         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1357                exp, exp->exp_client_uuid.uuid);
1358
1359         if (obd_dump_on_timeout)
1360                 libcfs_debug_dumplog();
1361
1362         /* need for safe call CDEBUG after obd_disconnect */
1363         class_export_get(exp);
1364
1365         /* Most callers into obd_disconnect are removing their own reference
1366          * (request, for example) in addition to the one from the hash table.
1367          * We don't have such a reference here, so make one. */
1368         class_export_get(exp);
1369         rc = obd_disconnect(exp);
1370         if (rc)
1371                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1372         else
1373                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1374                        exp, exp->exp_client_uuid.uuid);
1375         class_export_put(exp);
1376 }
1377 EXPORT_SYMBOL(class_fail_export);
1378
1379 char *obd_export_nid2str(struct obd_export *exp)
1380 {
1381         if (exp->exp_connection != NULL)
1382                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1383
1384         return "(no nid)";
1385 }
1386 EXPORT_SYMBOL(obd_export_nid2str);
1387
1388 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1389 {
1390         cfs_hash_t *nid_hash;
1391         struct obd_export *doomed_exp = NULL;
1392         int exports_evicted = 0;
1393
1394         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1395
1396         spin_lock(&obd->obd_dev_lock);
1397         /* umount has run already, so evict thread should leave
1398          * its task to umount thread now */
1399         if (obd->obd_stopping) {
1400                 spin_unlock(&obd->obd_dev_lock);
1401                 return exports_evicted;
1402         }
1403         nid_hash = obd->obd_nid_hash;
1404         cfs_hash_getref(nid_hash);
1405         spin_unlock(&obd->obd_dev_lock);
1406
1407         do {
1408                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1409                 if (doomed_exp == NULL)
1410                         break;
1411
1412                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1413                          "nid %s found, wanted nid %s, requested nid %s\n",
1414                          obd_export_nid2str(doomed_exp),
1415                          libcfs_nid2str(nid_key), nid);
1416                 LASSERTF(doomed_exp != obd->obd_self_export,
1417                          "self-export is hashed by NID?\n");
1418                 exports_evicted++;
1419                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1420                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1421                        exports_evicted);
1422                 class_fail_export(doomed_exp);
1423                 class_export_put(doomed_exp);
1424         } while (1);
1425
1426         cfs_hash_putref(nid_hash);
1427
1428         if (!exports_evicted)
1429                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1430                        obd->obd_name, nid);
1431         return exports_evicted;
1432 }
1433 EXPORT_SYMBOL(obd_export_evict_by_nid);
1434
1435 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1436 {
1437         cfs_hash_t *uuid_hash;
1438         struct obd_export *doomed_exp = NULL;
1439         struct obd_uuid doomed_uuid;
1440         int exports_evicted = 0;
1441
1442         spin_lock(&obd->obd_dev_lock);
1443         if (obd->obd_stopping) {
1444                 spin_unlock(&obd->obd_dev_lock);
1445                 return exports_evicted;
1446         }
1447         uuid_hash = obd->obd_uuid_hash;
1448         cfs_hash_getref(uuid_hash);
1449         spin_unlock(&obd->obd_dev_lock);
1450
1451         obd_str2uuid(&doomed_uuid, uuid);
1452         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1453                 CERROR("%s: can't evict myself\n", obd->obd_name);
1454                 cfs_hash_putref(uuid_hash);
1455                 return exports_evicted;
1456         }
1457
1458         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1459
1460         if (doomed_exp == NULL) {
1461                 CERROR("%s: can't disconnect %s: no exports found\n",
1462                        obd->obd_name, uuid);
1463         } else {
1464                 CWARN("%s: evicting %s at adminstrative request\n",
1465                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1466                 class_fail_export(doomed_exp);
1467                 class_export_put(doomed_exp);
1468                 exports_evicted++;
1469         }
1470         cfs_hash_putref(uuid_hash);
1471
1472         return exports_evicted;
1473 }
1474 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1475
1476 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1477 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1478 EXPORT_SYMBOL(class_export_dump_hook);
1479 #endif
1480
1481 static void print_export_data(struct obd_export *exp, const char *status,
1482                               int locks)
1483 {
1484         struct ptlrpc_reply_state *rs;
1485         struct ptlrpc_reply_state *first_reply = NULL;
1486         int nreplies = 0;
1487
1488         spin_lock(&exp->exp_lock);
1489         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1490                                 rs_exp_list) {
1491                 if (nreplies == 0)
1492                         first_reply = rs;
1493                 nreplies++;
1494         }
1495         spin_unlock(&exp->exp_lock);
1496
1497         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1498                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1499                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1500                cfs_atomic_read(&exp->exp_rpc_count),
1501                cfs_atomic_read(&exp->exp_cb_count),
1502                cfs_atomic_read(&exp->exp_locks_count),
1503                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1504                nreplies, first_reply, nreplies > 3 ? "..." : "",
1505                exp->exp_last_committed);
1506 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1507         if (locks && class_export_dump_hook != NULL)
1508                 class_export_dump_hook(exp);
1509 #endif
1510 }
1511
1512 void dump_exports(struct obd_device *obd, int locks)
1513 {
1514         struct obd_export *exp;
1515
1516         spin_lock(&obd->obd_dev_lock);
1517         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1518                 print_export_data(exp, "ACTIVE", locks);
1519         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1520                 print_export_data(exp, "UNLINKED", locks);
1521         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1522                 print_export_data(exp, "DELAYED", locks);
1523         spin_unlock(&obd->obd_dev_lock);
1524         spin_lock(&obd_zombie_impexp_lock);
1525         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1526                 print_export_data(exp, "ZOMBIE", locks);
1527         spin_unlock(&obd_zombie_impexp_lock);
1528 }
1529 EXPORT_SYMBOL(dump_exports);
1530
1531 void obd_exports_barrier(struct obd_device *obd)
1532 {
1533         int waited = 2;
1534         LASSERT(cfs_list_empty(&obd->obd_exports));
1535         spin_lock(&obd->obd_dev_lock);
1536         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1537                 spin_unlock(&obd->obd_dev_lock);
1538                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1539                                                    cfs_time_seconds(waited));
1540                 if (waited > 5 && IS_PO2(waited)) {
1541                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1542                                       "more than %d seconds. "
1543                                       "The obd refcount = %d. Is it stuck?\n",
1544                                       obd->obd_name, waited,
1545                                       cfs_atomic_read(&obd->obd_refcount));
1546                         dump_exports(obd, 1);
1547                 }
1548                 waited *= 2;
1549                 spin_lock(&obd->obd_dev_lock);
1550         }
1551         spin_unlock(&obd->obd_dev_lock);
1552 }
1553 EXPORT_SYMBOL(obd_exports_barrier);
1554
1555 /* Total amount of zombies to be destroyed */
1556 static int zombies_count = 0;
1557
1558 /**
1559  * kill zombie imports and exports
1560  */
1561 void obd_zombie_impexp_cull(void)
1562 {
1563         struct obd_import *import;
1564         struct obd_export *export;
1565         ENTRY;
1566
1567         do {
1568                 spin_lock(&obd_zombie_impexp_lock);
1569
1570                 import = NULL;
1571                 if (!cfs_list_empty(&obd_zombie_imports)) {
1572                         import = cfs_list_entry(obd_zombie_imports.next,
1573                                                 struct obd_import,
1574                                                 imp_zombie_chain);
1575                         cfs_list_del_init(&import->imp_zombie_chain);
1576                 }
1577
1578                 export = NULL;
1579                 if (!cfs_list_empty(&obd_zombie_exports)) {
1580                         export = cfs_list_entry(obd_zombie_exports.next,
1581                                                 struct obd_export,
1582                                                 exp_obd_chain);
1583                         cfs_list_del_init(&export->exp_obd_chain);
1584                 }
1585
1586                 spin_unlock(&obd_zombie_impexp_lock);
1587
1588                 if (import != NULL) {
1589                         class_import_destroy(import);
1590                         spin_lock(&obd_zombie_impexp_lock);
1591                         zombies_count--;
1592                         spin_unlock(&obd_zombie_impexp_lock);
1593                 }
1594
1595                 if (export != NULL) {
1596                         class_export_destroy(export);
1597                         spin_lock(&obd_zombie_impexp_lock);
1598                         zombies_count--;
1599                         spin_unlock(&obd_zombie_impexp_lock);
1600                 }
1601
1602                 cfs_cond_resched();
1603         } while (import != NULL || export != NULL);
1604         EXIT;
1605 }
1606
1607 static struct completion        obd_zombie_start;
1608 static struct completion        obd_zombie_stop;
1609 static unsigned long            obd_zombie_flags;
1610 static cfs_waitq_t              obd_zombie_waitq;
1611 static pid_t                    obd_zombie_pid;
1612
1613 enum {
1614         OBD_ZOMBIE_STOP         = 0x0001,
1615 };
1616
1617 /**
1618  * check for work for kill zombie import/export thread.
1619  */
1620 static int obd_zombie_impexp_check(void *arg)
1621 {
1622         int rc;
1623
1624         spin_lock(&obd_zombie_impexp_lock);
1625         rc = (zombies_count == 0) &&
1626              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1627         spin_unlock(&obd_zombie_impexp_lock);
1628
1629         RETURN(rc);
1630 }
1631
1632 /**
1633  * Add export to the obd_zombe thread and notify it.
1634  */
1635 static void obd_zombie_export_add(struct obd_export *exp) {
1636         spin_lock(&exp->exp_obd->obd_dev_lock);
1637         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1638         cfs_list_del_init(&exp->exp_obd_chain);
1639         spin_unlock(&exp->exp_obd->obd_dev_lock);
1640         spin_lock(&obd_zombie_impexp_lock);
1641         zombies_count++;
1642         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1643         spin_unlock(&obd_zombie_impexp_lock);
1644
1645         obd_zombie_impexp_notify();
1646 }
1647
1648 /**
1649  * Add import to the obd_zombe thread and notify it.
1650  */
1651 static void obd_zombie_import_add(struct obd_import *imp) {
1652         LASSERT(imp->imp_sec == NULL);
1653         LASSERT(imp->imp_rq_pool == NULL);
1654         spin_lock(&obd_zombie_impexp_lock);
1655         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1656         zombies_count++;
1657         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1658         spin_unlock(&obd_zombie_impexp_lock);
1659
1660         obd_zombie_impexp_notify();
1661 }
1662
1663 /**
1664  * notify import/export destroy thread about new zombie.
1665  */
1666 static void obd_zombie_impexp_notify(void)
1667 {
1668         /*
1669          * Make sure obd_zomebie_impexp_thread get this notification.
1670          * It is possible this signal only get by obd_zombie_barrier, and
1671          * barrier gulps this notification and sleeps away and hangs ensues
1672          */
1673         cfs_waitq_broadcast(&obd_zombie_waitq);
1674 }
1675
1676 /**
1677  * check whether obd_zombie is idle
1678  */
1679 static int obd_zombie_is_idle(void)
1680 {
1681         int rc;
1682
1683         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1684         spin_lock(&obd_zombie_impexp_lock);
1685         rc = (zombies_count == 0);
1686         spin_unlock(&obd_zombie_impexp_lock);
1687         return rc;
1688 }
1689
1690 /**
1691  * wait when obd_zombie import/export queues become empty
1692  */
1693 void obd_zombie_barrier(void)
1694 {
1695         struct l_wait_info lwi = { 0 };
1696
1697         if (obd_zombie_pid == cfs_curproc_pid())
1698                 /* don't wait for myself */
1699                 return;
1700         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1701 }
1702 EXPORT_SYMBOL(obd_zombie_barrier);
1703
1704 #ifdef __KERNEL__
1705
1706 /**
1707  * destroy zombie export/import thread.
1708  */
1709 static int obd_zombie_impexp_thread(void *unused)
1710 {
1711         int rc;
1712
1713         rc = cfs_daemonize_ctxt("obd_zombid");
1714         if (rc != 0) {
1715                 complete(&obd_zombie_start);
1716                 RETURN(rc);
1717         }
1718
1719         complete(&obd_zombie_start);
1720
1721         obd_zombie_pid = cfs_curproc_pid();
1722
1723         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1724                 struct l_wait_info lwi = { 0 };
1725
1726                 l_wait_event(obd_zombie_waitq,
1727                              !obd_zombie_impexp_check(NULL), &lwi);
1728                 obd_zombie_impexp_cull();
1729
1730                 /*
1731                  * Notify obd_zombie_barrier callers that queues
1732                  * may be empty.
1733                  */
1734                 cfs_waitq_signal(&obd_zombie_waitq);
1735         }
1736
1737         complete(&obd_zombie_stop);
1738
1739         RETURN(0);
1740 }
1741
1742 #else /* ! KERNEL */
1743
1744 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1745 static void *obd_zombie_impexp_work_cb;
1746 static void *obd_zombie_impexp_idle_cb;
1747
1748 int obd_zombie_impexp_kill(void *arg)
1749 {
1750         int rc = 0;
1751
1752         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1753                 obd_zombie_impexp_cull();
1754                 rc = 1;
1755         }
1756         cfs_atomic_dec(&zombie_recur);
1757         return rc;
1758 }
1759
1760 #endif
1761
1762 /**
1763  * start destroy zombie import/export thread
1764  */
1765 int obd_zombie_impexp_init(void)
1766 {
1767         int rc;
1768
1769         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1770         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1771         spin_lock_init(&obd_zombie_impexp_lock);
1772         init_completion(&obd_zombie_start);
1773         init_completion(&obd_zombie_stop);
1774         cfs_waitq_init(&obd_zombie_waitq);
1775         obd_zombie_pid = 0;
1776
1777 #ifdef __KERNEL__
1778         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1779         if (rc < 0)
1780                 RETURN(rc);
1781
1782         wait_for_completion(&obd_zombie_start);
1783 #else
1784
1785         obd_zombie_impexp_work_cb =
1786                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1787                                                  &obd_zombie_impexp_kill, NULL);
1788
1789         obd_zombie_impexp_idle_cb =
1790                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1791                                                  &obd_zombie_impexp_check, NULL);
1792         rc = 0;
1793 #endif
1794         RETURN(rc);
1795 }
1796 /**
1797  * stop destroy zombie import/export thread
1798  */
1799 void obd_zombie_impexp_stop(void)
1800 {
1801         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1802         obd_zombie_impexp_notify();
1803 #ifdef __KERNEL__
1804         wait_for_completion(&obd_zombie_stop);
1805 #else
1806         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1807         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1808 #endif
1809 }
1810
1811 /***** Kernel-userspace comm helpers *******/
1812
1813 /* Get length of entire message, including header */
1814 int kuc_len(int payload_len)
1815 {
1816         return sizeof(struct kuc_hdr) + payload_len;
1817 }
1818 EXPORT_SYMBOL(kuc_len);
1819
1820 /* Get a pointer to kuc header, given a ptr to the payload
1821  * @param p Pointer to payload area
1822  * @returns Pointer to kuc header
1823  */
1824 struct kuc_hdr * kuc_ptr(void *p)
1825 {
1826         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1827         LASSERT(lh->kuc_magic == KUC_MAGIC);
1828         return lh;
1829 }
1830 EXPORT_SYMBOL(kuc_ptr);
1831
1832 /* Test if payload is part of kuc message
1833  * @param p Pointer to payload area
1834  * @returns boolean
1835  */
1836 int kuc_ispayload(void *p)
1837 {
1838         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1839
1840         if (kh->kuc_magic == KUC_MAGIC)
1841                 return 1;
1842         else
1843                 return 0;
1844 }
1845 EXPORT_SYMBOL(kuc_ispayload);
1846
1847 /* Alloc space for a message, and fill in header
1848  * @return Pointer to payload area
1849  */
1850 void *kuc_alloc(int payload_len, int transport, int type)
1851 {
1852         struct kuc_hdr *lh;
1853         int len = kuc_len(payload_len);
1854
1855         OBD_ALLOC(lh, len);
1856         if (lh == NULL)
1857                 return ERR_PTR(-ENOMEM);
1858
1859         lh->kuc_magic = KUC_MAGIC;
1860         lh->kuc_transport = transport;
1861         lh->kuc_msgtype = type;
1862         lh->kuc_msglen = len;
1863
1864         return (void *)(lh + 1);
1865 }
1866 EXPORT_SYMBOL(kuc_alloc);
1867
1868 /* Takes pointer to payload area */
1869 inline void kuc_free(void *p, int payload_len)
1870 {
1871         struct kuc_hdr *lh = kuc_ptr(p);
1872         OBD_FREE(lh, kuc_len(payload_len));
1873 }
1874 EXPORT_SYMBOL(kuc_free);
1875
1876
1877