Whamcloud - gitweb
LU-2446 build: Update Whamcloud copyright messages for Intel
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
129                         modname = LUSTRE_MDT_NAME;
130
131                 if (!cfs_request_module("%s", modname)) {
132                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
133                         type = class_search_type(name);
134                 } else {
135                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
136                                            modname);
137                 }
138         }
139 #endif
140         if (type) {
141                 spin_lock(&type->obd_type_lock);
142                 type->typ_refcnt++;
143                 cfs_try_module_get(type->typ_dt_ops->o_owner);
144                 spin_unlock(&type->obd_type_lock);
145         }
146         return type;
147 }
148 EXPORT_SYMBOL(class_get_type);
149
150 void class_put_type(struct obd_type *type)
151 {
152         LASSERT(type);
153         spin_lock(&type->obd_type_lock);
154         type->typ_refcnt--;
155         cfs_module_put(type->typ_dt_ops->o_owner);
156         spin_unlock(&type->obd_type_lock);
157 }
158 EXPORT_SYMBOL(class_put_type);
159
160 #define CLASS_MAX_NAME 1024
161
162 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
163                         struct lprocfs_vars *vars, const char *name,
164                         struct lu_device_type *ldt)
165 {
166         struct obd_type *type;
167         int rc = 0;
168         ENTRY;
169
170         /* sanity check */
171         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
172
173         if (class_search_type(name)) {
174                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
175                 RETURN(-EEXIST);
176         }
177
178         rc = -ENOMEM;
179         OBD_ALLOC(type, sizeof(*type));
180         if (type == NULL)
181                 RETURN(rc);
182
183         OBD_ALLOC_PTR(type->typ_dt_ops);
184         OBD_ALLOC_PTR(type->typ_md_ops);
185         OBD_ALLOC(type->typ_name, strlen(name) + 1);
186
187         if (type->typ_dt_ops == NULL ||
188             type->typ_md_ops == NULL ||
189             type->typ_name == NULL)
190                 GOTO (failed, rc);
191
192         *(type->typ_dt_ops) = *dt_ops;
193         /* md_ops is optional */
194         if (md_ops)
195                 *(type->typ_md_ops) = *md_ops;
196         strcpy(type->typ_name, name);
197         spin_lock_init(&type->obd_type_lock);
198
199 #ifdef LPROCFS
200         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
201                                               vars, type);
202         if (IS_ERR(type->typ_procroot)) {
203                 rc = PTR_ERR(type->typ_procroot);
204                 type->typ_procroot = NULL;
205                 GOTO (failed, rc);
206         }
207 #endif
208         if (ldt != NULL) {
209                 type->typ_lu = ldt;
210                 rc = lu_device_type_init(ldt);
211                 if (rc != 0)
212                         GOTO (failed, rc);
213         }
214
215         spin_lock(&obd_types_lock);
216         cfs_list_add(&type->typ_chain, &obd_types);
217         spin_unlock(&obd_types_lock);
218
219         RETURN (0);
220
221  failed:
222         if (type->typ_name != NULL)
223                 OBD_FREE(type->typ_name, strlen(name) + 1);
224         if (type->typ_md_ops != NULL)
225                 OBD_FREE_PTR(type->typ_md_ops);
226         if (type->typ_dt_ops != NULL)
227                 OBD_FREE_PTR(type->typ_dt_ops);
228         OBD_FREE(type, sizeof(*type));
229         RETURN(rc);
230 }
231 EXPORT_SYMBOL(class_register_type);
232
233 int class_unregister_type(const char *name)
234 {
235         struct obd_type *type = class_search_type(name);
236         ENTRY;
237
238         if (!type) {
239                 CERROR("unknown obd type\n");
240                 RETURN(-EINVAL);
241         }
242
243         if (type->typ_refcnt) {
244                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
245                 /* This is a bad situation, let's make the best of it */
246                 /* Remove ops, but leave the name for debugging */
247                 OBD_FREE_PTR(type->typ_dt_ops);
248                 OBD_FREE_PTR(type->typ_md_ops);
249                 RETURN(-EBUSY);
250         }
251
252         /* we do not use type->typ_procroot as for compatibility purposes
253          * other modules can share names (i.e. lod can use lov entry). so
254          * we can't reference pointer as it can get invalided when another
255          * module removes the entry */
256         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
257
258         if (type->typ_lu)
259                 lu_device_type_fini(type->typ_lu);
260
261         spin_lock(&obd_types_lock);
262         cfs_list_del(&type->typ_chain);
263         spin_unlock(&obd_types_lock);
264         OBD_FREE(type->typ_name, strlen(name) + 1);
265         if (type->typ_dt_ops != NULL)
266                 OBD_FREE_PTR(type->typ_dt_ops);
267         if (type->typ_md_ops != NULL)
268                 OBD_FREE_PTR(type->typ_md_ops);
269         OBD_FREE(type, sizeof(*type));
270         RETURN(0);
271 } /* class_unregister_type */
272 EXPORT_SYMBOL(class_unregister_type);
273
274 /**
275  * Create a new obd device.
276  *
277  * Find an empty slot in ::obd_devs[], create a new obd device in it.
278  *
279  * \param[in] type_name obd device type string.
280  * \param[in] name      obd device name.
281  *
282  * \retval NULL if create fails, otherwise return the obd device
283  *         pointer created.
284  */
285 struct obd_device *class_newdev(const char *type_name, const char *name)
286 {
287         struct obd_device *result = NULL;
288         struct obd_device *newdev;
289         struct obd_type *type = NULL;
290         int i;
291         int new_obd_minor = 0;
292         ENTRY;
293
294         if (strlen(name) >= MAX_OBD_NAME) {
295                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
296                 RETURN(ERR_PTR(-EINVAL));
297         }
298
299         type = class_get_type(type_name);
300         if (type == NULL){
301                 CERROR("OBD: unknown type: %s\n", type_name);
302                 RETURN(ERR_PTR(-ENODEV));
303         }
304
305         newdev = obd_device_alloc();
306         if (newdev == NULL)
307                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
308
309         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
310
311         write_lock(&obd_dev_lock);
312         for (i = 0; i < class_devno_max(); i++) {
313                 struct obd_device *obd = class_num2obd(i);
314
315                 if (obd && obd->obd_name &&
316                     (strcmp(name, obd->obd_name) == 0)) {
317                         CERROR("Device %s already exists at %d, won't add\n",
318                                name, i);
319                         if (result) {
320                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
321                                          "%p obd_magic %08x != %08x\n", result,
322                                          result->obd_magic, OBD_DEVICE_MAGIC);
323                                 LASSERTF(result->obd_minor == new_obd_minor,
324                                          "%p obd_minor %d != %d\n", result,
325                                          result->obd_minor, new_obd_minor);
326
327                                 obd_devs[result->obd_minor] = NULL;
328                                 result->obd_name[0]='\0';
329                          }
330                         result = ERR_PTR(-EEXIST);
331                         break;
332                 }
333                 if (!result && !obd) {
334                         result = newdev;
335                         result->obd_minor = i;
336                         new_obd_minor = i;
337                         result->obd_type = type;
338                         strncpy(result->obd_name, name,
339                                 sizeof(result->obd_name) - 1);
340                         obd_devs[i] = result;
341                 }
342         }
343         write_unlock(&obd_dev_lock);
344
345         if (result == NULL && i >= class_devno_max()) {
346                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
347                        class_devno_max());
348                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
349         }
350
351         if (IS_ERR(result))
352                 GOTO(out, result);
353
354         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
355                result->obd_name, result);
356
357         RETURN(result);
358 out:
359         obd_device_free(newdev);
360 out_type:
361         class_put_type(type);
362         return result;
363 }
364
365 void class_release_dev(struct obd_device *obd)
366 {
367         struct obd_type *obd_type = obd->obd_type;
368
369         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
370                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
371         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
372                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
373         LASSERT(obd_type != NULL);
374
375         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
376                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
377
378         write_lock(&obd_dev_lock);
379         obd_devs[obd->obd_minor] = NULL;
380         write_unlock(&obd_dev_lock);
381         obd_device_free(obd);
382
383         class_put_type(obd_type);
384 }
385
386 int class_name2dev(const char *name)
387 {
388         int i;
389
390         if (!name)
391                 return -1;
392
393         read_lock(&obd_dev_lock);
394         for (i = 0; i < class_devno_max(); i++) {
395                 struct obd_device *obd = class_num2obd(i);
396
397                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
398                         /* Make sure we finished attaching before we give
399                            out any references */
400                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
401                         if (obd->obd_attached) {
402                                 read_unlock(&obd_dev_lock);
403                                 return i;
404                         }
405                         break;
406                 }
407         }
408         read_unlock(&obd_dev_lock);
409
410         return -1;
411 }
412 EXPORT_SYMBOL(class_name2dev);
413
414 struct obd_device *class_name2obd(const char *name)
415 {
416         int dev = class_name2dev(name);
417
418         if (dev < 0 || dev > class_devno_max())
419                 return NULL;
420         return class_num2obd(dev);
421 }
422 EXPORT_SYMBOL(class_name2obd);
423
424 int class_uuid2dev(struct obd_uuid *uuid)
425 {
426         int i;
427
428         read_lock(&obd_dev_lock);
429         for (i = 0; i < class_devno_max(); i++) {
430                 struct obd_device *obd = class_num2obd(i);
431
432                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
433                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
434                         read_unlock(&obd_dev_lock);
435                         return i;
436                 }
437         }
438         read_unlock(&obd_dev_lock);
439
440         return -1;
441 }
442 EXPORT_SYMBOL(class_uuid2dev);
443
444 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
445 {
446         int dev = class_uuid2dev(uuid);
447         if (dev < 0)
448                 return NULL;
449         return class_num2obd(dev);
450 }
451 EXPORT_SYMBOL(class_uuid2obd);
452
453 /**
454  * Get obd device from ::obd_devs[]
455  *
456  * \param num [in] array index
457  *
458  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
459  *         otherwise return the obd device there.
460  */
461 struct obd_device *class_num2obd(int num)
462 {
463         struct obd_device *obd = NULL;
464
465         if (num < class_devno_max()) {
466                 obd = obd_devs[num];
467                 if (obd == NULL)
468                         return NULL;
469
470                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
471                          "%p obd_magic %08x != %08x\n",
472                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
473                 LASSERTF(obd->obd_minor == num,
474                          "%p obd_minor %0d != %0d\n",
475                          obd, obd->obd_minor, num);
476         }
477
478         return obd;
479 }
480 EXPORT_SYMBOL(class_num2obd);
481
482 void class_obd_list(void)
483 {
484         char *status;
485         int i;
486
487         read_lock(&obd_dev_lock);
488         for (i = 0; i < class_devno_max(); i++) {
489                 struct obd_device *obd = class_num2obd(i);
490
491                 if (obd == NULL)
492                         continue;
493                 if (obd->obd_stopping)
494                         status = "ST";
495                 else if (obd->obd_set_up)
496                         status = "UP";
497                 else if (obd->obd_attached)
498                         status = "AT";
499                 else
500                         status = "--";
501                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
502                          i, status, obd->obd_type->typ_name,
503                          obd->obd_name, obd->obd_uuid.uuid,
504                          cfs_atomic_read(&obd->obd_refcount));
505         }
506         read_unlock(&obd_dev_lock);
507         return;
508 }
509
510 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
511    specified, then only the client with that uuid is returned,
512    otherwise any client connected to the tgt is returned. */
513 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
514                                           const char * typ_name,
515                                           struct obd_uuid *grp_uuid)
516 {
517         int i;
518
519         read_lock(&obd_dev_lock);
520         for (i = 0; i < class_devno_max(); i++) {
521                 struct obd_device *obd = class_num2obd(i);
522
523                 if (obd == NULL)
524                         continue;
525                 if ((strncmp(obd->obd_type->typ_name, typ_name,
526                              strlen(typ_name)) == 0)) {
527                         if (obd_uuid_equals(tgt_uuid,
528                                             &obd->u.cli.cl_target_uuid) &&
529                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
530                                                          &obd->obd_uuid) : 1)) {
531                                 read_unlock(&obd_dev_lock);
532                                 return obd;
533                         }
534                 }
535         }
536         read_unlock(&obd_dev_lock);
537
538         return NULL;
539 }
540 EXPORT_SYMBOL(class_find_client_obd);
541
542 /* Iterate the obd_device list looking devices have grp_uuid. Start
543    searching at *next, and if a device is found, the next index to look
544    at is saved in *next. If next is NULL, then the first matching device
545    will always be returned. */
546 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
547 {
548         int i;
549
550         if (next == NULL)
551                 i = 0;
552         else if (*next >= 0 && *next < class_devno_max())
553                 i = *next;
554         else
555                 return NULL;
556
557         read_lock(&obd_dev_lock);
558         for (; i < class_devno_max(); i++) {
559                 struct obd_device *obd = class_num2obd(i);
560
561                 if (obd == NULL)
562                         continue;
563                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
564                         if (next != NULL)
565                                 *next = i+1;
566                         read_unlock(&obd_dev_lock);
567                         return obd;
568                 }
569         }
570         read_unlock(&obd_dev_lock);
571
572         return NULL;
573 }
574 EXPORT_SYMBOL(class_devices_in_group);
575
576 /**
577  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
578  * adjust sptlrpc settings accordingly.
579  */
580 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
581 {
582         struct obd_device  *obd;
583         const char         *type;
584         int                 i, rc = 0, rc2;
585
586         LASSERT(namelen > 0);
587
588         read_lock(&obd_dev_lock);
589         for (i = 0; i < class_devno_max(); i++) {
590                 obd = class_num2obd(i);
591
592                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
593                         continue;
594
595                 /* only notify mdc, osc, mdt, ost */
596                 type = obd->obd_type->typ_name;
597                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
598                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
599                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
600                     strcmp(type, LUSTRE_OST_NAME) != 0)
601                         continue;
602
603                 if (strncmp(obd->obd_name, fsname, namelen))
604                         continue;
605
606                 class_incref(obd, __FUNCTION__, obd);
607                 read_unlock(&obd_dev_lock);
608                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
609                                          sizeof(KEY_SPTLRPC_CONF),
610                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
611                 rc = rc ? rc : rc2;
612                 class_decref(obd, __FUNCTION__, obd);
613                 read_lock(&obd_dev_lock);
614         }
615         read_unlock(&obd_dev_lock);
616         return rc;
617 }
618 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
619
620 void obd_cleanup_caches(void)
621 {
622         int rc;
623
624         ENTRY;
625         if (obd_device_cachep) {
626                 rc = cfs_mem_cache_destroy(obd_device_cachep);
627                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
628                 obd_device_cachep = NULL;
629         }
630         if (obdo_cachep) {
631                 rc = cfs_mem_cache_destroy(obdo_cachep);
632                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
633                 obdo_cachep = NULL;
634         }
635         if (import_cachep) {
636                 rc = cfs_mem_cache_destroy(import_cachep);
637                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
638                 import_cachep = NULL;
639         }
640         if (capa_cachep) {
641                 rc = cfs_mem_cache_destroy(capa_cachep);
642                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
643                 capa_cachep = NULL;
644         }
645         EXIT;
646 }
647
648 int obd_init_caches(void)
649 {
650         ENTRY;
651
652         LASSERT(obd_device_cachep == NULL);
653         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
654                                                  sizeof(struct obd_device),
655                                                  0, 0);
656         if (!obd_device_cachep)
657                 GOTO(out, -ENOMEM);
658
659         LASSERT(obdo_cachep == NULL);
660         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
661                                            0, 0);
662         if (!obdo_cachep)
663                 GOTO(out, -ENOMEM);
664
665         LASSERT(import_cachep == NULL);
666         import_cachep = cfs_mem_cache_create("ll_import_cache",
667                                              sizeof(struct obd_import),
668                                              0, 0);
669         if (!import_cachep)
670                 GOTO(out, -ENOMEM);
671
672         LASSERT(capa_cachep == NULL);
673         capa_cachep = cfs_mem_cache_create("capa_cache",
674                                            sizeof(struct obd_capa), 0, 0);
675         if (!capa_cachep)
676                 GOTO(out, -ENOMEM);
677
678         RETURN(0);
679  out:
680         obd_cleanup_caches();
681         RETURN(-ENOMEM);
682
683 }
684
685 /* map connection to client */
686 struct obd_export *class_conn2export(struct lustre_handle *conn)
687 {
688         struct obd_export *export;
689         ENTRY;
690
691         if (!conn) {
692                 CDEBUG(D_CACHE, "looking for null handle\n");
693                 RETURN(NULL);
694         }
695
696         if (conn->cookie == -1) {  /* this means assign a new connection */
697                 CDEBUG(D_CACHE, "want a new connection\n");
698                 RETURN(NULL);
699         }
700
701         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
702         export = class_handle2object(conn->cookie);
703         RETURN(export);
704 }
705 EXPORT_SYMBOL(class_conn2export);
706
707 struct obd_device *class_exp2obd(struct obd_export *exp)
708 {
709         if (exp)
710                 return exp->exp_obd;
711         return NULL;
712 }
713 EXPORT_SYMBOL(class_exp2obd);
714
715 struct obd_device *class_conn2obd(struct lustre_handle *conn)
716 {
717         struct obd_export *export;
718         export = class_conn2export(conn);
719         if (export) {
720                 struct obd_device *obd = export->exp_obd;
721                 class_export_put(export);
722                 return obd;
723         }
724         return NULL;
725 }
726 EXPORT_SYMBOL(class_conn2obd);
727
728 struct obd_import *class_exp2cliimp(struct obd_export *exp)
729 {
730         struct obd_device *obd = exp->exp_obd;
731         if (obd == NULL)
732                 return NULL;
733         return obd->u.cli.cl_import;
734 }
735 EXPORT_SYMBOL(class_exp2cliimp);
736
737 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
738 {
739         struct obd_device *obd = class_conn2obd(conn);
740         if (obd == NULL)
741                 return NULL;
742         return obd->u.cli.cl_import;
743 }
744 EXPORT_SYMBOL(class_conn2cliimp);
745
746 /* Export management functions */
747 static void class_export_destroy(struct obd_export *exp)
748 {
749         struct obd_device *obd = exp->exp_obd;
750         ENTRY;
751
752         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
753         LASSERT(obd != NULL);
754
755         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
756                exp->exp_client_uuid.uuid, obd->obd_name);
757
758         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
759         if (exp->exp_connection)
760                 ptlrpc_put_connection_superhack(exp->exp_connection);
761
762         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
763         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
764         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
765         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
766         obd_destroy_export(exp);
767         class_decref(obd, "export", exp);
768
769         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
770         EXIT;
771 }
772
773 static void export_handle_addref(void *export)
774 {
775         class_export_get(export);
776 }
777
778 static struct portals_handle_ops export_handle_ops = {
779         .hop_addref = export_handle_addref,
780         .hop_free   = NULL,
781 };
782
783 struct obd_export *class_export_get(struct obd_export *exp)
784 {
785         cfs_atomic_inc(&exp->exp_refcount);
786         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
787                cfs_atomic_read(&exp->exp_refcount));
788         return exp;
789 }
790 EXPORT_SYMBOL(class_export_get);
791
792 void class_export_put(struct obd_export *exp)
793 {
794         LASSERT(exp != NULL);
795         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
796         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
797                cfs_atomic_read(&exp->exp_refcount) - 1);
798
799         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
800                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
801                 CDEBUG(D_IOCTL, "final put %p/%s\n",
802                        exp, exp->exp_client_uuid.uuid);
803
804                 /* release nid stat refererence */
805                 lprocfs_exp_cleanup(exp);
806
807                 obd_zombie_export_add(exp);
808         }
809 }
810 EXPORT_SYMBOL(class_export_put);
811
812 /* Creates a new export, adds it to the hash table, and returns a
813  * pointer to it. The refcount is 2: one for the hash reference, and
814  * one for the pointer returned by this function. */
815 struct obd_export *class_new_export(struct obd_device *obd,
816                                     struct obd_uuid *cluuid)
817 {
818         struct obd_export *export;
819         cfs_hash_t *hash = NULL;
820         int rc = 0;
821         ENTRY;
822
823         OBD_ALLOC_PTR(export);
824         if (!export)
825                 return ERR_PTR(-ENOMEM);
826
827         export->exp_conn_cnt = 0;
828         export->exp_lock_hash = NULL;
829         export->exp_flock_hash = NULL;
830         cfs_atomic_set(&export->exp_refcount, 2);
831         cfs_atomic_set(&export->exp_rpc_count, 0);
832         cfs_atomic_set(&export->exp_cb_count, 0);
833         cfs_atomic_set(&export->exp_locks_count, 0);
834 #if LUSTRE_TRACKS_LOCK_EXP_REFS
835         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
836         spin_lock_init(&export->exp_locks_list_guard);
837 #endif
838         cfs_atomic_set(&export->exp_replay_count, 0);
839         export->exp_obd = obd;
840         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
841         spin_lock_init(&export->exp_uncommitted_replies_lock);
842         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
843         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
844         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
845         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
846         class_handle_hash(&export->exp_handle, &export_handle_ops);
847         export->exp_last_request_time = cfs_time_current_sec();
848         spin_lock_init(&export->exp_lock);
849         spin_lock_init(&export->exp_rpc_lock);
850         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
851         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
852         spin_lock_init(&export->exp_bl_list_lock);
853         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
854
855         export->exp_sp_peer = LUSTRE_SP_ANY;
856         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
857         export->exp_client_uuid = *cluuid;
858         obd_init_export(export);
859
860         spin_lock(&obd->obd_dev_lock);
861         /* shouldn't happen, but might race */
862         if (obd->obd_stopping)
863                 GOTO(exit_unlock, rc = -ENODEV);
864
865         hash = cfs_hash_getref(obd->obd_uuid_hash);
866         if (hash == NULL)
867                 GOTO(exit_unlock, rc = -ENODEV);
868         spin_unlock(&obd->obd_dev_lock);
869
870         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
871                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
872                 if (rc != 0) {
873                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
874                                       obd->obd_name, cluuid->uuid, rc);
875                         GOTO(exit_err, rc = -EALREADY);
876                 }
877         }
878
879         spin_lock(&obd->obd_dev_lock);
880         if (obd->obd_stopping) {
881                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
882                 GOTO(exit_unlock, rc = -ENODEV);
883         }
884
885         class_incref(obd, "export", export);
886         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
887         cfs_list_add_tail(&export->exp_obd_chain_timed,
888                           &export->exp_obd->obd_exports_timed);
889         export->exp_obd->obd_num_exports++;
890         spin_unlock(&obd->obd_dev_lock);
891         cfs_hash_putref(hash);
892         RETURN(export);
893
894 exit_unlock:
895         spin_unlock(&obd->obd_dev_lock);
896 exit_err:
897         if (hash)
898                 cfs_hash_putref(hash);
899         class_handle_unhash(&export->exp_handle);
900         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
901         obd_destroy_export(export);
902         OBD_FREE_PTR(export);
903         return ERR_PTR(rc);
904 }
905 EXPORT_SYMBOL(class_new_export);
906
907 void class_unlink_export(struct obd_export *exp)
908 {
909         class_handle_unhash(&exp->exp_handle);
910
911         spin_lock(&exp->exp_obd->obd_dev_lock);
912         /* delete an uuid-export hashitem from hashtables */
913         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
914                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
915                              &exp->exp_client_uuid,
916                              &exp->exp_uuid_hash);
917
918         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
919         cfs_list_del_init(&exp->exp_obd_chain_timed);
920         exp->exp_obd->obd_num_exports--;
921         spin_unlock(&exp->exp_obd->obd_dev_lock);
922         class_export_put(exp);
923 }
924 EXPORT_SYMBOL(class_unlink_export);
925
926 /* Import management functions */
927 void class_import_destroy(struct obd_import *imp)
928 {
929         ENTRY;
930
931         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
932                 imp->imp_obd->obd_name);
933
934         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
935
936         ptlrpc_put_connection_superhack(imp->imp_connection);
937
938         while (!cfs_list_empty(&imp->imp_conn_list)) {
939                 struct obd_import_conn *imp_conn;
940
941                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
942                                           struct obd_import_conn, oic_item);
943                 cfs_list_del_init(&imp_conn->oic_item);
944                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
945                 OBD_FREE(imp_conn, sizeof(*imp_conn));
946         }
947
948         LASSERT(imp->imp_sec == NULL);
949         class_decref(imp->imp_obd, "import", imp);
950         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
951         EXIT;
952 }
953
954 static void import_handle_addref(void *import)
955 {
956         class_import_get(import);
957 }
958
959 static struct portals_handle_ops import_handle_ops = {
960         .hop_addref = import_handle_addref,
961         .hop_free   = NULL,
962 };
963
964 struct obd_import *class_import_get(struct obd_import *import)
965 {
966         cfs_atomic_inc(&import->imp_refcount);
967         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
968                cfs_atomic_read(&import->imp_refcount),
969                import->imp_obd->obd_name);
970         return import;
971 }
972 EXPORT_SYMBOL(class_import_get);
973
974 void class_import_put(struct obd_import *imp)
975 {
976         ENTRY;
977
978         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
979         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
980
981         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
982                cfs_atomic_read(&imp->imp_refcount) - 1,
983                imp->imp_obd->obd_name);
984
985         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
986                 CDEBUG(D_INFO, "final put import %p\n", imp);
987                 obd_zombie_import_add(imp);
988         }
989
990         /* catch possible import put race */
991         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
992         EXIT;
993 }
994 EXPORT_SYMBOL(class_import_put);
995
996 static void init_imp_at(struct imp_at *at) {
997         int i;
998         at_init(&at->iat_net_latency, 0, 0);
999         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1000                 /* max service estimates are tracked on the server side, so
1001                    don't use the AT history here, just use the last reported
1002                    val. (But keep hist for proc histogram, worst_ever) */
1003                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1004                         AT_FLG_NOHIST);
1005         }
1006 }
1007
1008 struct obd_import *class_new_import(struct obd_device *obd)
1009 {
1010         struct obd_import *imp;
1011
1012         OBD_ALLOC(imp, sizeof(*imp));
1013         if (imp == NULL)
1014                 return NULL;
1015
1016         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1017         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1018         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1019         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1020         spin_lock_init(&imp->imp_lock);
1021         imp->imp_last_success_conn = 0;
1022         imp->imp_state = LUSTRE_IMP_NEW;
1023         imp->imp_obd = class_incref(obd, "import", imp);
1024         mutex_init(&imp->imp_sec_mutex);
1025         cfs_waitq_init(&imp->imp_recovery_waitq);
1026
1027         cfs_atomic_set(&imp->imp_refcount, 2);
1028         cfs_atomic_set(&imp->imp_unregistering, 0);
1029         cfs_atomic_set(&imp->imp_inflight, 0);
1030         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1031         cfs_atomic_set(&imp->imp_inval_count, 0);
1032         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1033         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1034         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1035         init_imp_at(&imp->imp_at);
1036
1037         /* the default magic is V2, will be used in connect RPC, and
1038          * then adjusted according to the flags in request/reply. */
1039         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1040
1041         return imp;
1042 }
1043 EXPORT_SYMBOL(class_new_import);
1044
1045 void class_destroy_import(struct obd_import *import)
1046 {
1047         LASSERT(import != NULL);
1048         LASSERT(import != LP_POISON);
1049
1050         class_handle_unhash(&import->imp_handle);
1051
1052         spin_lock(&import->imp_lock);
1053         import->imp_generation++;
1054         spin_unlock(&import->imp_lock);
1055         class_import_put(import);
1056 }
1057 EXPORT_SYMBOL(class_destroy_import);
1058
1059 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1060
1061 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1062 {
1063         spin_lock(&exp->exp_locks_list_guard);
1064
1065         LASSERT(lock->l_exp_refs_nr >= 0);
1066
1067         if (lock->l_exp_refs_target != NULL &&
1068             lock->l_exp_refs_target != exp) {
1069                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1070                               exp, lock, lock->l_exp_refs_target);
1071         }
1072         if ((lock->l_exp_refs_nr ++) == 0) {
1073                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1074                 lock->l_exp_refs_target = exp;
1075         }
1076         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1077                lock, exp, lock->l_exp_refs_nr);
1078         spin_unlock(&exp->exp_locks_list_guard);
1079 }
1080 EXPORT_SYMBOL(__class_export_add_lock_ref);
1081
1082 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1083 {
1084         spin_lock(&exp->exp_locks_list_guard);
1085         LASSERT(lock->l_exp_refs_nr > 0);
1086         if (lock->l_exp_refs_target != exp) {
1087                 LCONSOLE_WARN("lock %p, "
1088                               "mismatching export pointers: %p, %p\n",
1089                               lock, lock->l_exp_refs_target, exp);
1090         }
1091         if (-- lock->l_exp_refs_nr == 0) {
1092                 cfs_list_del_init(&lock->l_exp_refs_link);
1093                 lock->l_exp_refs_target = NULL;
1094         }
1095         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1096                lock, exp, lock->l_exp_refs_nr);
1097         spin_unlock(&exp->exp_locks_list_guard);
1098 }
1099 EXPORT_SYMBOL(__class_export_del_lock_ref);
1100 #endif
1101
1102 /* A connection defines an export context in which preallocation can
1103    be managed. This releases the export pointer reference, and returns
1104    the export handle, so the export refcount is 1 when this function
1105    returns. */
1106 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1107                   struct obd_uuid *cluuid)
1108 {
1109         struct obd_export *export;
1110         LASSERT(conn != NULL);
1111         LASSERT(obd != NULL);
1112         LASSERT(cluuid != NULL);
1113         ENTRY;
1114
1115         export = class_new_export(obd, cluuid);
1116         if (IS_ERR(export))
1117                 RETURN(PTR_ERR(export));
1118
1119         conn->cookie = export->exp_handle.h_cookie;
1120         class_export_put(export);
1121
1122         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1123                cluuid->uuid, conn->cookie);
1124         RETURN(0);
1125 }
1126 EXPORT_SYMBOL(class_connect);
1127
1128 /* if export is involved in recovery then clean up related things */
1129 void class_export_recovery_cleanup(struct obd_export *exp)
1130 {
1131         struct obd_device *obd = exp->exp_obd;
1132
1133         spin_lock(&obd->obd_recovery_task_lock);
1134         if (exp->exp_delayed)
1135                 obd->obd_delayed_clients--;
1136         if (obd->obd_recovering) {
1137                 if (exp->exp_in_recovery) {
1138                         spin_lock(&exp->exp_lock);
1139                         exp->exp_in_recovery = 0;
1140                         spin_unlock(&exp->exp_lock);
1141                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1142                         cfs_atomic_dec(&obd->obd_connected_clients);
1143                 }
1144
1145                 /* if called during recovery then should update
1146                  * obd_stale_clients counter,
1147                  * lightweight exports are not counted */
1148                 if (exp->exp_failed &&
1149                     (exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) == 0)
1150                         exp->exp_obd->obd_stale_clients++;
1151         }
1152         spin_unlock(&obd->obd_recovery_task_lock);
1153         /** Cleanup req replay fields */
1154         if (exp->exp_req_replay_needed) {
1155                 spin_lock(&exp->exp_lock);
1156                 exp->exp_req_replay_needed = 0;
1157                 spin_unlock(&exp->exp_lock);
1158                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1159                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1160         }
1161         /** Cleanup lock replay data */
1162         if (exp->exp_lock_replay_needed) {
1163                 spin_lock(&exp->exp_lock);
1164                 exp->exp_lock_replay_needed = 0;
1165                 spin_unlock(&exp->exp_lock);
1166                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1167                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1168         }
1169 }
1170
1171 /* This function removes 1-3 references from the export:
1172  * 1 - for export pointer passed
1173  * and if disconnect really need
1174  * 2 - removing from hash
1175  * 3 - in client_unlink_export
1176  * The export pointer passed to this function can destroyed */
1177 int class_disconnect(struct obd_export *export)
1178 {
1179         int already_disconnected;
1180         ENTRY;
1181
1182         if (export == NULL) {
1183                 CWARN("attempting to free NULL export %p\n", export);
1184                 RETURN(-EINVAL);
1185         }
1186
1187         spin_lock(&export->exp_lock);
1188         already_disconnected = export->exp_disconnected;
1189         export->exp_disconnected = 1;
1190         spin_unlock(&export->exp_lock);
1191
1192         /* class_cleanup(), abort_recovery(), and class_fail_export()
1193          * all end up in here, and if any of them race we shouldn't
1194          * call extra class_export_puts(). */
1195         if (already_disconnected) {
1196                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1197                 GOTO(no_disconn, already_disconnected);
1198         }
1199
1200         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1201                export->exp_handle.h_cookie);
1202
1203         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1204                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1205                              &export->exp_connection->c_peer.nid,
1206                              &export->exp_nid_hash);
1207
1208         class_export_recovery_cleanup(export);
1209         class_unlink_export(export);
1210 no_disconn:
1211         class_export_put(export);
1212         RETURN(0);
1213 }
1214 EXPORT_SYMBOL(class_disconnect);
1215
1216 /* Return non-zero for a fully connected export */
1217 int class_connected_export(struct obd_export *exp)
1218 {
1219         if (exp) {
1220                 int connected;
1221                 spin_lock(&exp->exp_lock);
1222                 connected = (exp->exp_conn_cnt > 0);
1223                 spin_unlock(&exp->exp_lock);
1224                 return connected;
1225         }
1226         return 0;
1227 }
1228 EXPORT_SYMBOL(class_connected_export);
1229
1230 static void class_disconnect_export_list(cfs_list_t *list,
1231                                          enum obd_option flags)
1232 {
1233         int rc;
1234         struct obd_export *exp;
1235         ENTRY;
1236
1237         /* It's possible that an export may disconnect itself, but
1238          * nothing else will be added to this list. */
1239         while (!cfs_list_empty(list)) {
1240                 exp = cfs_list_entry(list->next, struct obd_export,
1241                                      exp_obd_chain);
1242                 /* need for safe call CDEBUG after obd_disconnect */
1243                 class_export_get(exp);
1244
1245                 spin_lock(&exp->exp_lock);
1246                 exp->exp_flags = flags;
1247                 spin_unlock(&exp->exp_lock);
1248
1249                 if (obd_uuid_equals(&exp->exp_client_uuid,
1250                                     &exp->exp_obd->obd_uuid)) {
1251                         CDEBUG(D_HA,
1252                                "exp %p export uuid == obd uuid, don't discon\n",
1253                                exp);
1254                         /* Need to delete this now so we don't end up pointing
1255                          * to work_list later when this export is cleaned up. */
1256                         cfs_list_del_init(&exp->exp_obd_chain);
1257                         class_export_put(exp);
1258                         continue;
1259                 }
1260
1261                 class_export_get(exp);
1262                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1263                        "last request at "CFS_TIME_T"\n",
1264                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1265                        exp, exp->exp_last_request_time);
1266                 /* release one export reference anyway */
1267                 rc = obd_disconnect(exp);
1268
1269                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1270                        obd_export_nid2str(exp), exp, rc);
1271                 class_export_put(exp);
1272         }
1273         EXIT;
1274 }
1275
1276 void class_disconnect_exports(struct obd_device *obd)
1277 {
1278         cfs_list_t work_list;
1279         ENTRY;
1280
1281         /* Move all of the exports from obd_exports to a work list, en masse. */
1282         CFS_INIT_LIST_HEAD(&work_list);
1283         spin_lock(&obd->obd_dev_lock);
1284         cfs_list_splice_init(&obd->obd_exports, &work_list);
1285         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1286         spin_unlock(&obd->obd_dev_lock);
1287
1288         if (!cfs_list_empty(&work_list)) {
1289                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1290                        "disconnecting them\n", obd->obd_minor, obd);
1291                 class_disconnect_export_list(&work_list,
1292                                              exp_flags_from_obd(obd));
1293         } else
1294                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1295                        obd->obd_minor, obd);
1296         EXIT;
1297 }
1298 EXPORT_SYMBOL(class_disconnect_exports);
1299
1300 /* Remove exports that have not completed recovery.
1301  */
1302 void class_disconnect_stale_exports(struct obd_device *obd,
1303                                     int (*test_export)(struct obd_export *))
1304 {
1305         cfs_list_t work_list;
1306         struct obd_export *exp, *n;
1307         int evicted = 0;
1308         ENTRY;
1309
1310         CFS_INIT_LIST_HEAD(&work_list);
1311         spin_lock(&obd->obd_dev_lock);
1312         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1313                                      exp_obd_chain) {
1314                 /* don't count self-export as client */
1315                 if (obd_uuid_equals(&exp->exp_client_uuid,
1316                                     &exp->exp_obd->obd_uuid))
1317                         continue;
1318
1319                 /* don't evict clients which have no slot in last_rcvd
1320                  * (e.g. lightweight connection) */
1321                 if (exp->exp_target_data.ted_lr_idx == -1)
1322                         continue;
1323
1324                 spin_lock(&exp->exp_lock);
1325                 if (exp->exp_failed || test_export(exp)) {
1326                         spin_unlock(&exp->exp_lock);
1327                         continue;
1328                 }
1329                 exp->exp_failed = 1;
1330                 spin_unlock(&exp->exp_lock);
1331
1332                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1333                 evicted++;
1334                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1335                        obd->obd_name, exp->exp_client_uuid.uuid,
1336                        exp->exp_connection == NULL ? "<unknown>" :
1337                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1338                 print_export_data(exp, "EVICTING", 0);
1339         }
1340         spin_unlock(&obd->obd_dev_lock);
1341
1342         if (evicted)
1343                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1344                               obd->obd_name, evicted);
1345
1346         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1347                                                  OBD_OPT_ABORT_RECOV);
1348         EXIT;
1349 }
1350 EXPORT_SYMBOL(class_disconnect_stale_exports);
1351
1352 void class_fail_export(struct obd_export *exp)
1353 {
1354         int rc, already_failed;
1355
1356         spin_lock(&exp->exp_lock);
1357         already_failed = exp->exp_failed;
1358         exp->exp_failed = 1;
1359         spin_unlock(&exp->exp_lock);
1360
1361         if (already_failed) {
1362                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1363                        exp, exp->exp_client_uuid.uuid);
1364                 return;
1365         }
1366
1367         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1368                exp, exp->exp_client_uuid.uuid);
1369
1370         if (obd_dump_on_timeout)
1371                 libcfs_debug_dumplog();
1372
1373         /* need for safe call CDEBUG after obd_disconnect */
1374         class_export_get(exp);
1375
1376         /* Most callers into obd_disconnect are removing their own reference
1377          * (request, for example) in addition to the one from the hash table.
1378          * We don't have such a reference here, so make one. */
1379         class_export_get(exp);
1380         rc = obd_disconnect(exp);
1381         if (rc)
1382                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1383         else
1384                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1385                        exp, exp->exp_client_uuid.uuid);
1386         class_export_put(exp);
1387 }
1388 EXPORT_SYMBOL(class_fail_export);
1389
1390 char *obd_export_nid2str(struct obd_export *exp)
1391 {
1392         if (exp->exp_connection != NULL)
1393                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1394
1395         return "(no nid)";
1396 }
1397 EXPORT_SYMBOL(obd_export_nid2str);
1398
1399 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1400 {
1401         cfs_hash_t *nid_hash;
1402         struct obd_export *doomed_exp = NULL;
1403         int exports_evicted = 0;
1404
1405         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1406
1407         spin_lock(&obd->obd_dev_lock);
1408         /* umount has run already, so evict thread should leave
1409          * its task to umount thread now */
1410         if (obd->obd_stopping) {
1411                 spin_unlock(&obd->obd_dev_lock);
1412                 return exports_evicted;
1413         }
1414         nid_hash = obd->obd_nid_hash;
1415         cfs_hash_getref(nid_hash);
1416         spin_unlock(&obd->obd_dev_lock);
1417
1418         do {
1419                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1420                 if (doomed_exp == NULL)
1421                         break;
1422
1423                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1424                          "nid %s found, wanted nid %s, requested nid %s\n",
1425                          obd_export_nid2str(doomed_exp),
1426                          libcfs_nid2str(nid_key), nid);
1427                 LASSERTF(doomed_exp != obd->obd_self_export,
1428                          "self-export is hashed by NID?\n");
1429                 exports_evicted++;
1430                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1431                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1432                        exports_evicted);
1433                 class_fail_export(doomed_exp);
1434                 class_export_put(doomed_exp);
1435         } while (1);
1436
1437         cfs_hash_putref(nid_hash);
1438
1439         if (!exports_evicted)
1440                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1441                        obd->obd_name, nid);
1442         return exports_evicted;
1443 }
1444 EXPORT_SYMBOL(obd_export_evict_by_nid);
1445
1446 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1447 {
1448         cfs_hash_t *uuid_hash;
1449         struct obd_export *doomed_exp = NULL;
1450         struct obd_uuid doomed_uuid;
1451         int exports_evicted = 0;
1452
1453         spin_lock(&obd->obd_dev_lock);
1454         if (obd->obd_stopping) {
1455                 spin_unlock(&obd->obd_dev_lock);
1456                 return exports_evicted;
1457         }
1458         uuid_hash = obd->obd_uuid_hash;
1459         cfs_hash_getref(uuid_hash);
1460         spin_unlock(&obd->obd_dev_lock);
1461
1462         obd_str2uuid(&doomed_uuid, uuid);
1463         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1464                 CERROR("%s: can't evict myself\n", obd->obd_name);
1465                 cfs_hash_putref(uuid_hash);
1466                 return exports_evicted;
1467         }
1468
1469         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1470
1471         if (doomed_exp == NULL) {
1472                 CERROR("%s: can't disconnect %s: no exports found\n",
1473                        obd->obd_name, uuid);
1474         } else {
1475                 CWARN("%s: evicting %s at adminstrative request\n",
1476                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1477                 class_fail_export(doomed_exp);
1478                 class_export_put(doomed_exp);
1479                 exports_evicted++;
1480         }
1481         cfs_hash_putref(uuid_hash);
1482
1483         return exports_evicted;
1484 }
1485 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1486
1487 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1488 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1489 EXPORT_SYMBOL(class_export_dump_hook);
1490 #endif
1491
1492 static void print_export_data(struct obd_export *exp, const char *status,
1493                               int locks)
1494 {
1495         struct ptlrpc_reply_state *rs;
1496         struct ptlrpc_reply_state *first_reply = NULL;
1497         int nreplies = 0;
1498
1499         spin_lock(&exp->exp_lock);
1500         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1501                                 rs_exp_list) {
1502                 if (nreplies == 0)
1503                         first_reply = rs;
1504                 nreplies++;
1505         }
1506         spin_unlock(&exp->exp_lock);
1507
1508         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1509                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1510                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1511                cfs_atomic_read(&exp->exp_rpc_count),
1512                cfs_atomic_read(&exp->exp_cb_count),
1513                cfs_atomic_read(&exp->exp_locks_count),
1514                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1515                nreplies, first_reply, nreplies > 3 ? "..." : "",
1516                exp->exp_last_committed);
1517 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1518         if (locks && class_export_dump_hook != NULL)
1519                 class_export_dump_hook(exp);
1520 #endif
1521 }
1522
1523 void dump_exports(struct obd_device *obd, int locks)
1524 {
1525         struct obd_export *exp;
1526
1527         spin_lock(&obd->obd_dev_lock);
1528         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1529                 print_export_data(exp, "ACTIVE", locks);
1530         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1531                 print_export_data(exp, "UNLINKED", locks);
1532         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1533                 print_export_data(exp, "DELAYED", locks);
1534         spin_unlock(&obd->obd_dev_lock);
1535         spin_lock(&obd_zombie_impexp_lock);
1536         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1537                 print_export_data(exp, "ZOMBIE", locks);
1538         spin_unlock(&obd_zombie_impexp_lock);
1539 }
1540 EXPORT_SYMBOL(dump_exports);
1541
1542 void obd_exports_barrier(struct obd_device *obd)
1543 {
1544         int waited = 2;
1545         LASSERT(cfs_list_empty(&obd->obd_exports));
1546         spin_lock(&obd->obd_dev_lock);
1547         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1548                 spin_unlock(&obd->obd_dev_lock);
1549                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1550                                                    cfs_time_seconds(waited));
1551                 if (waited > 5 && IS_PO2(waited)) {
1552                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1553                                       "more than %d seconds. "
1554                                       "The obd refcount = %d. Is it stuck?\n",
1555                                       obd->obd_name, waited,
1556                                       cfs_atomic_read(&obd->obd_refcount));
1557                         dump_exports(obd, 1);
1558                 }
1559                 waited *= 2;
1560                 spin_lock(&obd->obd_dev_lock);
1561         }
1562         spin_unlock(&obd->obd_dev_lock);
1563 }
1564 EXPORT_SYMBOL(obd_exports_barrier);
1565
1566 /* Total amount of zombies to be destroyed */
1567 static int zombies_count = 0;
1568
1569 /**
1570  * kill zombie imports and exports
1571  */
1572 void obd_zombie_impexp_cull(void)
1573 {
1574         struct obd_import *import;
1575         struct obd_export *export;
1576         ENTRY;
1577
1578         do {
1579                 spin_lock(&obd_zombie_impexp_lock);
1580
1581                 import = NULL;
1582                 if (!cfs_list_empty(&obd_zombie_imports)) {
1583                         import = cfs_list_entry(obd_zombie_imports.next,
1584                                                 struct obd_import,
1585                                                 imp_zombie_chain);
1586                         cfs_list_del_init(&import->imp_zombie_chain);
1587                 }
1588
1589                 export = NULL;
1590                 if (!cfs_list_empty(&obd_zombie_exports)) {
1591                         export = cfs_list_entry(obd_zombie_exports.next,
1592                                                 struct obd_export,
1593                                                 exp_obd_chain);
1594                         cfs_list_del_init(&export->exp_obd_chain);
1595                 }
1596
1597                 spin_unlock(&obd_zombie_impexp_lock);
1598
1599                 if (import != NULL) {
1600                         class_import_destroy(import);
1601                         spin_lock(&obd_zombie_impexp_lock);
1602                         zombies_count--;
1603                         spin_unlock(&obd_zombie_impexp_lock);
1604                 }
1605
1606                 if (export != NULL) {
1607                         class_export_destroy(export);
1608                         spin_lock(&obd_zombie_impexp_lock);
1609                         zombies_count--;
1610                         spin_unlock(&obd_zombie_impexp_lock);
1611                 }
1612
1613                 cfs_cond_resched();
1614         } while (import != NULL || export != NULL);
1615         EXIT;
1616 }
1617
1618 static struct completion        obd_zombie_start;
1619 static struct completion        obd_zombie_stop;
1620 static unsigned long            obd_zombie_flags;
1621 static cfs_waitq_t              obd_zombie_waitq;
1622 static pid_t                    obd_zombie_pid;
1623
1624 enum {
1625         OBD_ZOMBIE_STOP         = 0x0001,
1626 };
1627
1628 /**
1629  * check for work for kill zombie import/export thread.
1630  */
1631 static int obd_zombie_impexp_check(void *arg)
1632 {
1633         int rc;
1634
1635         spin_lock(&obd_zombie_impexp_lock);
1636         rc = (zombies_count == 0) &&
1637              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1638         spin_unlock(&obd_zombie_impexp_lock);
1639
1640         RETURN(rc);
1641 }
1642
1643 /**
1644  * Add export to the obd_zombe thread and notify it.
1645  */
1646 static void obd_zombie_export_add(struct obd_export *exp) {
1647         spin_lock(&exp->exp_obd->obd_dev_lock);
1648         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1649         cfs_list_del_init(&exp->exp_obd_chain);
1650         spin_unlock(&exp->exp_obd->obd_dev_lock);
1651         spin_lock(&obd_zombie_impexp_lock);
1652         zombies_count++;
1653         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1654         spin_unlock(&obd_zombie_impexp_lock);
1655
1656         obd_zombie_impexp_notify();
1657 }
1658
1659 /**
1660  * Add import to the obd_zombe thread and notify it.
1661  */
1662 static void obd_zombie_import_add(struct obd_import *imp) {
1663         LASSERT(imp->imp_sec == NULL);
1664         LASSERT(imp->imp_rq_pool == NULL);
1665         spin_lock(&obd_zombie_impexp_lock);
1666         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1667         zombies_count++;
1668         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1669         spin_unlock(&obd_zombie_impexp_lock);
1670
1671         obd_zombie_impexp_notify();
1672 }
1673
1674 /**
1675  * notify import/export destroy thread about new zombie.
1676  */
1677 static void obd_zombie_impexp_notify(void)
1678 {
1679         /*
1680          * Make sure obd_zomebie_impexp_thread get this notification.
1681          * It is possible this signal only get by obd_zombie_barrier, and
1682          * barrier gulps this notification and sleeps away and hangs ensues
1683          */
1684         cfs_waitq_broadcast(&obd_zombie_waitq);
1685 }
1686
1687 /**
1688  * check whether obd_zombie is idle
1689  */
1690 static int obd_zombie_is_idle(void)
1691 {
1692         int rc;
1693
1694         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1695         spin_lock(&obd_zombie_impexp_lock);
1696         rc = (zombies_count == 0);
1697         spin_unlock(&obd_zombie_impexp_lock);
1698         return rc;
1699 }
1700
1701 /**
1702  * wait when obd_zombie import/export queues become empty
1703  */
1704 void obd_zombie_barrier(void)
1705 {
1706         struct l_wait_info lwi = { 0 };
1707
1708         if (obd_zombie_pid == cfs_curproc_pid())
1709                 /* don't wait for myself */
1710                 return;
1711         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1712 }
1713 EXPORT_SYMBOL(obd_zombie_barrier);
1714
1715 #ifdef __KERNEL__
1716
1717 /**
1718  * destroy zombie export/import thread.
1719  */
1720 static int obd_zombie_impexp_thread(void *unused)
1721 {
1722         int rc;
1723
1724         rc = cfs_daemonize_ctxt("obd_zombid");
1725         if (rc != 0) {
1726                 complete(&obd_zombie_start);
1727                 RETURN(rc);
1728         }
1729
1730         complete(&obd_zombie_start);
1731
1732         obd_zombie_pid = cfs_curproc_pid();
1733
1734         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1735                 struct l_wait_info lwi = { 0 };
1736
1737                 l_wait_event(obd_zombie_waitq,
1738                              !obd_zombie_impexp_check(NULL), &lwi);
1739                 obd_zombie_impexp_cull();
1740
1741                 /*
1742                  * Notify obd_zombie_barrier callers that queues
1743                  * may be empty.
1744                  */
1745                 cfs_waitq_signal(&obd_zombie_waitq);
1746         }
1747
1748         complete(&obd_zombie_stop);
1749
1750         RETURN(0);
1751 }
1752
1753 #else /* ! KERNEL */
1754
1755 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1756 static void *obd_zombie_impexp_work_cb;
1757 static void *obd_zombie_impexp_idle_cb;
1758
1759 int obd_zombie_impexp_kill(void *arg)
1760 {
1761         int rc = 0;
1762
1763         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1764                 obd_zombie_impexp_cull();
1765                 rc = 1;
1766         }
1767         cfs_atomic_dec(&zombie_recur);
1768         return rc;
1769 }
1770
1771 #endif
1772
1773 /**
1774  * start destroy zombie import/export thread
1775  */
1776 int obd_zombie_impexp_init(void)
1777 {
1778         int rc;
1779
1780         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1781         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1782         spin_lock_init(&obd_zombie_impexp_lock);
1783         init_completion(&obd_zombie_start);
1784         init_completion(&obd_zombie_stop);
1785         cfs_waitq_init(&obd_zombie_waitq);
1786         obd_zombie_pid = 0;
1787
1788 #ifdef __KERNEL__
1789         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1790         if (rc < 0)
1791                 RETURN(rc);
1792
1793         wait_for_completion(&obd_zombie_start);
1794 #else
1795
1796         obd_zombie_impexp_work_cb =
1797                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1798                                                  &obd_zombie_impexp_kill, NULL);
1799
1800         obd_zombie_impexp_idle_cb =
1801                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1802                                                  &obd_zombie_impexp_check, NULL);
1803         rc = 0;
1804 #endif
1805         RETURN(rc);
1806 }
1807 /**
1808  * stop destroy zombie import/export thread
1809  */
1810 void obd_zombie_impexp_stop(void)
1811 {
1812         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1813         obd_zombie_impexp_notify();
1814 #ifdef __KERNEL__
1815         wait_for_completion(&obd_zombie_stop);
1816 #else
1817         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1818         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1819 #endif
1820 }
1821
1822 /***** Kernel-userspace comm helpers *******/
1823
1824 /* Get length of entire message, including header */
1825 int kuc_len(int payload_len)
1826 {
1827         return sizeof(struct kuc_hdr) + payload_len;
1828 }
1829 EXPORT_SYMBOL(kuc_len);
1830
1831 /* Get a pointer to kuc header, given a ptr to the payload
1832  * @param p Pointer to payload area
1833  * @returns Pointer to kuc header
1834  */
1835 struct kuc_hdr * kuc_ptr(void *p)
1836 {
1837         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1838         LASSERT(lh->kuc_magic == KUC_MAGIC);
1839         return lh;
1840 }
1841 EXPORT_SYMBOL(kuc_ptr);
1842
1843 /* Test if payload is part of kuc message
1844  * @param p Pointer to payload area
1845  * @returns boolean
1846  */
1847 int kuc_ispayload(void *p)
1848 {
1849         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1850
1851         if (kh->kuc_magic == KUC_MAGIC)
1852                 return 1;
1853         else
1854                 return 0;
1855 }
1856 EXPORT_SYMBOL(kuc_ispayload);
1857
1858 /* Alloc space for a message, and fill in header
1859  * @return Pointer to payload area
1860  */
1861 void *kuc_alloc(int payload_len, int transport, int type)
1862 {
1863         struct kuc_hdr *lh;
1864         int len = kuc_len(payload_len);
1865
1866         OBD_ALLOC(lh, len);
1867         if (lh == NULL)
1868                 return ERR_PTR(-ENOMEM);
1869
1870         lh->kuc_magic = KUC_MAGIC;
1871         lh->kuc_transport = transport;
1872         lh->kuc_msgtype = type;
1873         lh->kuc_msglen = len;
1874
1875         return (void *)(lh + 1);
1876 }
1877 EXPORT_SYMBOL(kuc_alloc);
1878
1879 /* Takes pointer to payload area */
1880 inline void kuc_free(void *p, int payload_len)
1881 {
1882         struct kuc_hdr *lh = kuc_ptr(p);
1883         OBD_FREE(lh, kuc_len(payload_len));
1884 }
1885 EXPORT_SYMBOL(kuc_free);
1886
1887
1888