Whamcloud - gitweb
75f19fdbe37f76d5732ee3c423c2ca1d993a48c4
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
129                         modname = LUSTRE_MDT_NAME;
130
131                 if (!cfs_request_module("%s", modname)) {
132                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
133                         type = class_search_type(name);
134                 } else {
135                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
136                                            modname);
137                 }
138         }
139 #endif
140         if (type) {
141                 spin_lock(&type->obd_type_lock);
142                 type->typ_refcnt++;
143                 cfs_try_module_get(type->typ_dt_ops->o_owner);
144                 spin_unlock(&type->obd_type_lock);
145         }
146         return type;
147 }
148 EXPORT_SYMBOL(class_get_type);
149
150 void class_put_type(struct obd_type *type)
151 {
152         LASSERT(type);
153         spin_lock(&type->obd_type_lock);
154         type->typ_refcnt--;
155         cfs_module_put(type->typ_dt_ops->o_owner);
156         spin_unlock(&type->obd_type_lock);
157 }
158 EXPORT_SYMBOL(class_put_type);
159
160 #define CLASS_MAX_NAME 1024
161
162 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
163                         struct lprocfs_vars *vars, const char *name,
164                         struct lu_device_type *ldt)
165 {
166         struct obd_type *type;
167         int rc = 0;
168         ENTRY;
169
170         /* sanity check */
171         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
172
173         if (class_search_type(name)) {
174                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
175                 RETURN(-EEXIST);
176         }
177
178         rc = -ENOMEM;
179         OBD_ALLOC(type, sizeof(*type));
180         if (type == NULL)
181                 RETURN(rc);
182
183         OBD_ALLOC_PTR(type->typ_dt_ops);
184         OBD_ALLOC_PTR(type->typ_md_ops);
185         OBD_ALLOC(type->typ_name, strlen(name) + 1);
186
187         if (type->typ_dt_ops == NULL ||
188             type->typ_md_ops == NULL ||
189             type->typ_name == NULL)
190                 GOTO (failed, rc);
191
192         *(type->typ_dt_ops) = *dt_ops;
193         /* md_ops is optional */
194         if (md_ops)
195                 *(type->typ_md_ops) = *md_ops;
196         strcpy(type->typ_name, name);
197         spin_lock_init(&type->obd_type_lock);
198
199 #ifdef LPROCFS
200         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
201                                               vars, type);
202         if (IS_ERR(type->typ_procroot)) {
203                 rc = PTR_ERR(type->typ_procroot);
204                 type->typ_procroot = NULL;
205                 GOTO (failed, rc);
206         }
207 #endif
208         if (ldt != NULL) {
209                 type->typ_lu = ldt;
210                 rc = lu_device_type_init(ldt);
211                 if (rc != 0)
212                         GOTO (failed, rc);
213         }
214
215         spin_lock(&obd_types_lock);
216         cfs_list_add(&type->typ_chain, &obd_types);
217         spin_unlock(&obd_types_lock);
218
219         RETURN (0);
220
221  failed:
222         if (type->typ_name != NULL)
223                 OBD_FREE(type->typ_name, strlen(name) + 1);
224         if (type->typ_md_ops != NULL)
225                 OBD_FREE_PTR(type->typ_md_ops);
226         if (type->typ_dt_ops != NULL)
227                 OBD_FREE_PTR(type->typ_dt_ops);
228         OBD_FREE(type, sizeof(*type));
229         RETURN(rc);
230 }
231 EXPORT_SYMBOL(class_register_type);
232
233 int class_unregister_type(const char *name)
234 {
235         struct obd_type *type = class_search_type(name);
236         ENTRY;
237
238         if (!type) {
239                 CERROR("unknown obd type\n");
240                 RETURN(-EINVAL);
241         }
242
243         if (type->typ_refcnt) {
244                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
245                 /* This is a bad situation, let's make the best of it */
246                 /* Remove ops, but leave the name for debugging */
247                 OBD_FREE_PTR(type->typ_dt_ops);
248                 OBD_FREE_PTR(type->typ_md_ops);
249                 RETURN(-EBUSY);
250         }
251
252         /* we do not use type->typ_procroot as for compatibility purposes
253          * other modules can share names (i.e. lod can use lov entry). so
254          * we can't reference pointer as it can get invalided when another
255          * module removes the entry */
256         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
257
258         if (type->typ_lu)
259                 lu_device_type_fini(type->typ_lu);
260
261         spin_lock(&obd_types_lock);
262         cfs_list_del(&type->typ_chain);
263         spin_unlock(&obd_types_lock);
264         OBD_FREE(type->typ_name, strlen(name) + 1);
265         if (type->typ_dt_ops != NULL)
266                 OBD_FREE_PTR(type->typ_dt_ops);
267         if (type->typ_md_ops != NULL)
268                 OBD_FREE_PTR(type->typ_md_ops);
269         OBD_FREE(type, sizeof(*type));
270         RETURN(0);
271 } /* class_unregister_type */
272 EXPORT_SYMBOL(class_unregister_type);
273
274 /**
275  * Create a new obd device.
276  *
277  * Find an empty slot in ::obd_devs[], create a new obd device in it.
278  *
279  * \param[in] type_name obd device type string.
280  * \param[in] name      obd device name.
281  *
282  * \retval NULL if create fails, otherwise return the obd device
283  *         pointer created.
284  */
285 struct obd_device *class_newdev(const char *type_name, const char *name)
286 {
287         struct obd_device *result = NULL;
288         struct obd_device *newdev;
289         struct obd_type *type = NULL;
290         int i;
291         int new_obd_minor = 0;
292         ENTRY;
293
294         if (strlen(name) >= MAX_OBD_NAME) {
295                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
296                 RETURN(ERR_PTR(-EINVAL));
297         }
298
299         type = class_get_type(type_name);
300         if (type == NULL){
301                 CERROR("OBD: unknown type: %s\n", type_name);
302                 RETURN(ERR_PTR(-ENODEV));
303         }
304
305         newdev = obd_device_alloc();
306         if (newdev == NULL)
307                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
308
309         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
310
311         write_lock(&obd_dev_lock);
312         for (i = 0; i < class_devno_max(); i++) {
313                 struct obd_device *obd = class_num2obd(i);
314
315                 if (obd && obd->obd_name &&
316                     (strcmp(name, obd->obd_name) == 0)) {
317                         CERROR("Device %s already exists at %d, won't add\n",
318                                name, i);
319                         if (result) {
320                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
321                                          "%p obd_magic %08x != %08x\n", result,
322                                          result->obd_magic, OBD_DEVICE_MAGIC);
323                                 LASSERTF(result->obd_minor == new_obd_minor,
324                                          "%p obd_minor %d != %d\n", result,
325                                          result->obd_minor, new_obd_minor);
326
327                                 obd_devs[result->obd_minor] = NULL;
328                                 result->obd_name[0]='\0';
329                          }
330                         result = ERR_PTR(-EEXIST);
331                         break;
332                 }
333                 if (!result && !obd) {
334                         result = newdev;
335                         result->obd_minor = i;
336                         new_obd_minor = i;
337                         result->obd_type = type;
338                         strncpy(result->obd_name, name,
339                                 sizeof(result->obd_name) - 1);
340                         obd_devs[i] = result;
341                 }
342         }
343         write_unlock(&obd_dev_lock);
344
345         if (result == NULL && i >= class_devno_max()) {
346                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
347                        class_devno_max());
348                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
349         }
350
351         if (IS_ERR(result))
352                 GOTO(out, result);
353
354         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
355                result->obd_name, result);
356
357         RETURN(result);
358 out:
359         obd_device_free(newdev);
360 out_type:
361         class_put_type(type);
362         return result;
363 }
364
365 void class_release_dev(struct obd_device *obd)
366 {
367         struct obd_type *obd_type = obd->obd_type;
368
369         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
370                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
371         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
372                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
373         LASSERT(obd_type != NULL);
374
375         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
376                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
377
378         write_lock(&obd_dev_lock);
379         obd_devs[obd->obd_minor] = NULL;
380         write_unlock(&obd_dev_lock);
381         obd_device_free(obd);
382
383         class_put_type(obd_type);
384 }
385
386 int class_name2dev(const char *name)
387 {
388         int i;
389
390         if (!name)
391                 return -1;
392
393         read_lock(&obd_dev_lock);
394         for (i = 0; i < class_devno_max(); i++) {
395                 struct obd_device *obd = class_num2obd(i);
396
397                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
398                         /* Make sure we finished attaching before we give
399                            out any references */
400                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
401                         if (obd->obd_attached) {
402                                 read_unlock(&obd_dev_lock);
403                                 return i;
404                         }
405                         break;
406                 }
407         }
408         read_unlock(&obd_dev_lock);
409
410         return -1;
411 }
412 EXPORT_SYMBOL(class_name2dev);
413
414 struct obd_device *class_name2obd(const char *name)
415 {
416         int dev = class_name2dev(name);
417
418         if (dev < 0 || dev > class_devno_max())
419                 return NULL;
420         return class_num2obd(dev);
421 }
422 EXPORT_SYMBOL(class_name2obd);
423
424 int class_uuid2dev(struct obd_uuid *uuid)
425 {
426         int i;
427
428         read_lock(&obd_dev_lock);
429         for (i = 0; i < class_devno_max(); i++) {
430                 struct obd_device *obd = class_num2obd(i);
431
432                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
433                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
434                         read_unlock(&obd_dev_lock);
435                         return i;
436                 }
437         }
438         read_unlock(&obd_dev_lock);
439
440         return -1;
441 }
442 EXPORT_SYMBOL(class_uuid2dev);
443
444 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
445 {
446         int dev = class_uuid2dev(uuid);
447         if (dev < 0)
448                 return NULL;
449         return class_num2obd(dev);
450 }
451 EXPORT_SYMBOL(class_uuid2obd);
452
453 /**
454  * Get obd device from ::obd_devs[]
455  *
456  * \param num [in] array index
457  *
458  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
459  *         otherwise return the obd device there.
460  */
461 struct obd_device *class_num2obd(int num)
462 {
463         struct obd_device *obd = NULL;
464
465         if (num < class_devno_max()) {
466                 obd = obd_devs[num];
467                 if (obd == NULL)
468                         return NULL;
469
470                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
471                          "%p obd_magic %08x != %08x\n",
472                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
473                 LASSERTF(obd->obd_minor == num,
474                          "%p obd_minor %0d != %0d\n",
475                          obd, obd->obd_minor, num);
476         }
477
478         return obd;
479 }
480 EXPORT_SYMBOL(class_num2obd);
481
482 void class_obd_list(void)
483 {
484         char *status;
485         int i;
486
487         read_lock(&obd_dev_lock);
488         for (i = 0; i < class_devno_max(); i++) {
489                 struct obd_device *obd = class_num2obd(i);
490
491                 if (obd == NULL)
492                         continue;
493                 if (obd->obd_stopping)
494                         status = "ST";
495                 else if (obd->obd_set_up)
496                         status = "UP";
497                 else if (obd->obd_attached)
498                         status = "AT";
499                 else
500                         status = "--";
501                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
502                          i, status, obd->obd_type->typ_name,
503                          obd->obd_name, obd->obd_uuid.uuid,
504                          cfs_atomic_read(&obd->obd_refcount));
505         }
506         read_unlock(&obd_dev_lock);
507         return;
508 }
509
510 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
511    specified, then only the client with that uuid is returned,
512    otherwise any client connected to the tgt is returned. */
513 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
514                                           const char * typ_name,
515                                           struct obd_uuid *grp_uuid)
516 {
517         int i;
518
519         read_lock(&obd_dev_lock);
520         for (i = 0; i < class_devno_max(); i++) {
521                 struct obd_device *obd = class_num2obd(i);
522
523                 if (obd == NULL)
524                         continue;
525                 if ((strncmp(obd->obd_type->typ_name, typ_name,
526                              strlen(typ_name)) == 0)) {
527                         if (obd_uuid_equals(tgt_uuid,
528                                             &obd->u.cli.cl_target_uuid) &&
529                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
530                                                          &obd->obd_uuid) : 1)) {
531                                 read_unlock(&obd_dev_lock);
532                                 return obd;
533                         }
534                 }
535         }
536         read_unlock(&obd_dev_lock);
537
538         return NULL;
539 }
540 EXPORT_SYMBOL(class_find_client_obd);
541
542 /* Iterate the obd_device list looking devices have grp_uuid. Start
543    searching at *next, and if a device is found, the next index to look
544    at is saved in *next. If next is NULL, then the first matching device
545    will always be returned. */
546 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
547 {
548         int i;
549
550         if (next == NULL)
551                 i = 0;
552         else if (*next >= 0 && *next < class_devno_max())
553                 i = *next;
554         else
555                 return NULL;
556
557         read_lock(&obd_dev_lock);
558         for (; i < class_devno_max(); i++) {
559                 struct obd_device *obd = class_num2obd(i);
560
561                 if (obd == NULL)
562                         continue;
563                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
564                         if (next != NULL)
565                                 *next = i+1;
566                         read_unlock(&obd_dev_lock);
567                         return obd;
568                 }
569         }
570         read_unlock(&obd_dev_lock);
571
572         return NULL;
573 }
574 EXPORT_SYMBOL(class_devices_in_group);
575
576 /**
577  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
578  * adjust sptlrpc settings accordingly.
579  */
580 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
581 {
582         struct obd_device  *obd;
583         const char         *type;
584         int                 i, rc = 0, rc2;
585
586         LASSERT(namelen > 0);
587
588         read_lock(&obd_dev_lock);
589         for (i = 0; i < class_devno_max(); i++) {
590                 obd = class_num2obd(i);
591
592                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
593                         continue;
594
595                 /* only notify mdc, osc, mdt, ost */
596                 type = obd->obd_type->typ_name;
597                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
598                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
599                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
600                     strcmp(type, LUSTRE_OST_NAME) != 0)
601                         continue;
602
603                 if (strncmp(obd->obd_name, fsname, namelen))
604                         continue;
605
606                 class_incref(obd, __FUNCTION__, obd);
607                 read_unlock(&obd_dev_lock);
608                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
609                                          sizeof(KEY_SPTLRPC_CONF),
610                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
611                 rc = rc ? rc : rc2;
612                 class_decref(obd, __FUNCTION__, obd);
613                 read_lock(&obd_dev_lock);
614         }
615         read_unlock(&obd_dev_lock);
616         return rc;
617 }
618 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
619
620 void obd_cleanup_caches(void)
621 {
622         int rc;
623
624         ENTRY;
625         if (obd_device_cachep) {
626                 rc = cfs_mem_cache_destroy(obd_device_cachep);
627                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
628                 obd_device_cachep = NULL;
629         }
630         if (obdo_cachep) {
631                 rc = cfs_mem_cache_destroy(obdo_cachep);
632                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
633                 obdo_cachep = NULL;
634         }
635         if (import_cachep) {
636                 rc = cfs_mem_cache_destroy(import_cachep);
637                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
638                 import_cachep = NULL;
639         }
640         if (capa_cachep) {
641                 rc = cfs_mem_cache_destroy(capa_cachep);
642                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
643                 capa_cachep = NULL;
644         }
645         EXIT;
646 }
647
648 int obd_init_caches(void)
649 {
650         ENTRY;
651
652         LASSERT(obd_device_cachep == NULL);
653         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
654                                                  sizeof(struct obd_device),
655                                                  0, 0);
656         if (!obd_device_cachep)
657                 GOTO(out, -ENOMEM);
658
659         LASSERT(obdo_cachep == NULL);
660         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
661                                            0, 0);
662         if (!obdo_cachep)
663                 GOTO(out, -ENOMEM);
664
665         LASSERT(import_cachep == NULL);
666         import_cachep = cfs_mem_cache_create("ll_import_cache",
667                                              sizeof(struct obd_import),
668                                              0, 0);
669         if (!import_cachep)
670                 GOTO(out, -ENOMEM);
671
672         LASSERT(capa_cachep == NULL);
673         capa_cachep = cfs_mem_cache_create("capa_cache",
674                                            sizeof(struct obd_capa), 0, 0);
675         if (!capa_cachep)
676                 GOTO(out, -ENOMEM);
677
678         RETURN(0);
679  out:
680         obd_cleanup_caches();
681         RETURN(-ENOMEM);
682
683 }
684
685 /* map connection to client */
686 struct obd_export *class_conn2export(struct lustre_handle *conn)
687 {
688         struct obd_export *export;
689         ENTRY;
690
691         if (!conn) {
692                 CDEBUG(D_CACHE, "looking for null handle\n");
693                 RETURN(NULL);
694         }
695
696         if (conn->cookie == -1) {  /* this means assign a new connection */
697                 CDEBUG(D_CACHE, "want a new connection\n");
698                 RETURN(NULL);
699         }
700
701         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
702         export = class_handle2object(conn->cookie);
703         RETURN(export);
704 }
705 EXPORT_SYMBOL(class_conn2export);
706
707 struct obd_device *class_exp2obd(struct obd_export *exp)
708 {
709         if (exp)
710                 return exp->exp_obd;
711         return NULL;
712 }
713 EXPORT_SYMBOL(class_exp2obd);
714
715 struct obd_device *class_conn2obd(struct lustre_handle *conn)
716 {
717         struct obd_export *export;
718         export = class_conn2export(conn);
719         if (export) {
720                 struct obd_device *obd = export->exp_obd;
721                 class_export_put(export);
722                 return obd;
723         }
724         return NULL;
725 }
726 EXPORT_SYMBOL(class_conn2obd);
727
728 struct obd_import *class_exp2cliimp(struct obd_export *exp)
729 {
730         struct obd_device *obd = exp->exp_obd;
731         if (obd == NULL)
732                 return NULL;
733         return obd->u.cli.cl_import;
734 }
735 EXPORT_SYMBOL(class_exp2cliimp);
736
737 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
738 {
739         struct obd_device *obd = class_conn2obd(conn);
740         if (obd == NULL)
741                 return NULL;
742         return obd->u.cli.cl_import;
743 }
744 EXPORT_SYMBOL(class_conn2cliimp);
745
746 /* Export management functions */
747 static void class_export_destroy(struct obd_export *exp)
748 {
749         struct obd_device *obd = exp->exp_obd;
750         ENTRY;
751
752         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
753         LASSERT(obd != NULL);
754
755         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
756                exp->exp_client_uuid.uuid, obd->obd_name);
757
758         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
759         if (exp->exp_connection)
760                 ptlrpc_put_connection_superhack(exp->exp_connection);
761
762         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
763         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
764         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
765         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
766         obd_destroy_export(exp);
767         class_decref(obd, "export", exp);
768
769         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
770         EXIT;
771 }
772
773 static void export_handle_addref(void *export)
774 {
775         class_export_get(export);
776 }
777
778 static struct portals_handle_ops export_handle_ops = {
779         .hop_addref = export_handle_addref,
780         .hop_free   = NULL,
781 };
782
783 struct obd_export *class_export_get(struct obd_export *exp)
784 {
785         cfs_atomic_inc(&exp->exp_refcount);
786         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
787                cfs_atomic_read(&exp->exp_refcount));
788         return exp;
789 }
790 EXPORT_SYMBOL(class_export_get);
791
792 void class_export_put(struct obd_export *exp)
793 {
794         LASSERT(exp != NULL);
795         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
796         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
797                cfs_atomic_read(&exp->exp_refcount) - 1);
798
799         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
800                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
801                 CDEBUG(D_IOCTL, "final put %p/%s\n",
802                        exp, exp->exp_client_uuid.uuid);
803
804                 /* release nid stat refererence */
805                 lprocfs_exp_cleanup(exp);
806
807                 obd_zombie_export_add(exp);
808         }
809 }
810 EXPORT_SYMBOL(class_export_put);
811
812 /* Creates a new export, adds it to the hash table, and returns a
813  * pointer to it. The refcount is 2: one for the hash reference, and
814  * one for the pointer returned by this function. */
815 struct obd_export *class_new_export(struct obd_device *obd,
816                                     struct obd_uuid *cluuid)
817 {
818         struct obd_export *export;
819         cfs_hash_t *hash = NULL;
820         int rc = 0;
821         ENTRY;
822
823         OBD_ALLOC_PTR(export);
824         if (!export)
825                 return ERR_PTR(-ENOMEM);
826
827         export->exp_conn_cnt = 0;
828         export->exp_lock_hash = NULL;
829         export->exp_flock_hash = NULL;
830         cfs_atomic_set(&export->exp_refcount, 2);
831         cfs_atomic_set(&export->exp_rpc_count, 0);
832         cfs_atomic_set(&export->exp_cb_count, 0);
833         cfs_atomic_set(&export->exp_locks_count, 0);
834 #if LUSTRE_TRACKS_LOCK_EXP_REFS
835         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
836         spin_lock_init(&export->exp_locks_list_guard);
837 #endif
838         cfs_atomic_set(&export->exp_replay_count, 0);
839         export->exp_obd = obd;
840         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
841         spin_lock_init(&export->exp_uncommitted_replies_lock);
842         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
843         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
844         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
845         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
846         class_handle_hash(&export->exp_handle, &export_handle_ops);
847         export->exp_last_request_time = cfs_time_current_sec();
848         spin_lock_init(&export->exp_lock);
849         spin_lock_init(&export->exp_rpc_lock);
850         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
851         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
852         spin_lock_init(&export->exp_bl_list_lock);
853         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
854
855         export->exp_sp_peer = LUSTRE_SP_ANY;
856         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
857         export->exp_client_uuid = *cluuid;
858         obd_init_export(export);
859
860         spin_lock(&obd->obd_dev_lock);
861         /* shouldn't happen, but might race */
862         if (obd->obd_stopping)
863                 GOTO(exit_unlock, rc = -ENODEV);
864
865         hash = cfs_hash_getref(obd->obd_uuid_hash);
866         if (hash == NULL)
867                 GOTO(exit_unlock, rc = -ENODEV);
868         spin_unlock(&obd->obd_dev_lock);
869
870         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
871                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
872                 if (rc != 0) {
873                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
874                                       obd->obd_name, cluuid->uuid, rc);
875                         GOTO(exit_err, rc = -EALREADY);
876                 }
877         }
878
879         spin_lock(&obd->obd_dev_lock);
880         if (obd->obd_stopping) {
881                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
882                 GOTO(exit_unlock, rc = -ENODEV);
883         }
884
885         class_incref(obd, "export", export);
886         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
887         cfs_list_add_tail(&export->exp_obd_chain_timed,
888                           &export->exp_obd->obd_exports_timed);
889         export->exp_obd->obd_num_exports++;
890         spin_unlock(&obd->obd_dev_lock);
891         cfs_hash_putref(hash);
892         RETURN(export);
893
894 exit_unlock:
895         spin_unlock(&obd->obd_dev_lock);
896 exit_err:
897         if (hash)
898                 cfs_hash_putref(hash);
899         class_handle_unhash(&export->exp_handle);
900         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
901         obd_destroy_export(export);
902         OBD_FREE_PTR(export);
903         return ERR_PTR(rc);
904 }
905 EXPORT_SYMBOL(class_new_export);
906
907 void class_unlink_export(struct obd_export *exp)
908 {
909         class_handle_unhash(&exp->exp_handle);
910
911         spin_lock(&exp->exp_obd->obd_dev_lock);
912         /* delete an uuid-export hashitem from hashtables */
913         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
914                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
915                              &exp->exp_client_uuid,
916                              &exp->exp_uuid_hash);
917
918         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
919         cfs_list_del_init(&exp->exp_obd_chain_timed);
920         exp->exp_obd->obd_num_exports--;
921         spin_unlock(&exp->exp_obd->obd_dev_lock);
922         class_export_put(exp);
923 }
924 EXPORT_SYMBOL(class_unlink_export);
925
926 /* Import management functions */
927 void class_import_destroy(struct obd_import *imp)
928 {
929         ENTRY;
930
931         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
932                 imp->imp_obd->obd_name);
933
934         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
935
936         ptlrpc_put_connection_superhack(imp->imp_connection);
937
938         while (!cfs_list_empty(&imp->imp_conn_list)) {
939                 struct obd_import_conn *imp_conn;
940
941                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
942                                           struct obd_import_conn, oic_item);
943                 cfs_list_del_init(&imp_conn->oic_item);
944                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
945                 OBD_FREE(imp_conn, sizeof(*imp_conn));
946         }
947
948         LASSERT(imp->imp_sec == NULL);
949         class_decref(imp->imp_obd, "import", imp);
950         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
951         EXIT;
952 }
953
954 static void import_handle_addref(void *import)
955 {
956         class_import_get(import);
957 }
958
959 static struct portals_handle_ops import_handle_ops = {
960         .hop_addref = import_handle_addref,
961         .hop_free   = NULL,
962 };
963
964 struct obd_import *class_import_get(struct obd_import *import)
965 {
966         cfs_atomic_inc(&import->imp_refcount);
967         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
968                cfs_atomic_read(&import->imp_refcount),
969                import->imp_obd->obd_name);
970         return import;
971 }
972 EXPORT_SYMBOL(class_import_get);
973
974 void class_import_put(struct obd_import *imp)
975 {
976         ENTRY;
977
978         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
979         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
980
981         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
982                cfs_atomic_read(&imp->imp_refcount) - 1,
983                imp->imp_obd->obd_name);
984
985         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
986                 CDEBUG(D_INFO, "final put import %p\n", imp);
987                 obd_zombie_import_add(imp);
988         }
989
990         /* catch possible import put race */
991         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
992         EXIT;
993 }
994 EXPORT_SYMBOL(class_import_put);
995
996 static void init_imp_at(struct imp_at *at) {
997         int i;
998         at_init(&at->iat_net_latency, 0, 0);
999         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1000                 /* max service estimates are tracked on the server side, so
1001                    don't use the AT history here, just use the last reported
1002                    val. (But keep hist for proc histogram, worst_ever) */
1003                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1004                         AT_FLG_NOHIST);
1005         }
1006 }
1007
1008 struct obd_import *class_new_import(struct obd_device *obd)
1009 {
1010         struct obd_import *imp;
1011
1012         OBD_ALLOC(imp, sizeof(*imp));
1013         if (imp == NULL)
1014                 return NULL;
1015
1016         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1017         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1018         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1019         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1020         spin_lock_init(&imp->imp_lock);
1021         imp->imp_last_success_conn = 0;
1022         imp->imp_state = LUSTRE_IMP_NEW;
1023         imp->imp_obd = class_incref(obd, "import", imp);
1024         mutex_init(&imp->imp_sec_mutex);
1025         cfs_waitq_init(&imp->imp_recovery_waitq);
1026
1027         cfs_atomic_set(&imp->imp_refcount, 2);
1028         cfs_atomic_set(&imp->imp_unregistering, 0);
1029         cfs_atomic_set(&imp->imp_inflight, 0);
1030         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1031         cfs_atomic_set(&imp->imp_inval_count, 0);
1032         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1033         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1034         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1035         init_imp_at(&imp->imp_at);
1036
1037         /* the default magic is V2, will be used in connect RPC, and
1038          * then adjusted according to the flags in request/reply. */
1039         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1040
1041         return imp;
1042 }
1043 EXPORT_SYMBOL(class_new_import);
1044
1045 void class_destroy_import(struct obd_import *import)
1046 {
1047         LASSERT(import != NULL);
1048         LASSERT(import != LP_POISON);
1049
1050         class_handle_unhash(&import->imp_handle);
1051
1052         spin_lock(&import->imp_lock);
1053         import->imp_generation++;
1054         spin_unlock(&import->imp_lock);
1055         class_import_put(import);
1056 }
1057 EXPORT_SYMBOL(class_destroy_import);
1058
1059 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1060
1061 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1062 {
1063         spin_lock(&exp->exp_locks_list_guard);
1064
1065         LASSERT(lock->l_exp_refs_nr >= 0);
1066
1067         if (lock->l_exp_refs_target != NULL &&
1068             lock->l_exp_refs_target != exp) {
1069                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1070                               exp, lock, lock->l_exp_refs_target);
1071         }
1072         if ((lock->l_exp_refs_nr ++) == 0) {
1073                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1074                 lock->l_exp_refs_target = exp;
1075         }
1076         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1077                lock, exp, lock->l_exp_refs_nr);
1078         spin_unlock(&exp->exp_locks_list_guard);
1079 }
1080 EXPORT_SYMBOL(__class_export_add_lock_ref);
1081
1082 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1083 {
1084         spin_lock(&exp->exp_locks_list_guard);
1085         LASSERT(lock->l_exp_refs_nr > 0);
1086         if (lock->l_exp_refs_target != exp) {
1087                 LCONSOLE_WARN("lock %p, "
1088                               "mismatching export pointers: %p, %p\n",
1089                               lock, lock->l_exp_refs_target, exp);
1090         }
1091         if (-- lock->l_exp_refs_nr == 0) {
1092                 cfs_list_del_init(&lock->l_exp_refs_link);
1093                 lock->l_exp_refs_target = NULL;
1094         }
1095         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1096                lock, exp, lock->l_exp_refs_nr);
1097         spin_unlock(&exp->exp_locks_list_guard);
1098 }
1099 EXPORT_SYMBOL(__class_export_del_lock_ref);
1100 #endif
1101
1102 /* A connection defines an export context in which preallocation can
1103    be managed. This releases the export pointer reference, and returns
1104    the export handle, so the export refcount is 1 when this function
1105    returns. */
1106 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1107                   struct obd_uuid *cluuid)
1108 {
1109         struct obd_export *export;
1110         LASSERT(conn != NULL);
1111         LASSERT(obd != NULL);
1112         LASSERT(cluuid != NULL);
1113         ENTRY;
1114
1115         export = class_new_export(obd, cluuid);
1116         if (IS_ERR(export))
1117                 RETURN(PTR_ERR(export));
1118
1119         conn->cookie = export->exp_handle.h_cookie;
1120         class_export_put(export);
1121
1122         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1123                cluuid->uuid, conn->cookie);
1124         RETURN(0);
1125 }
1126 EXPORT_SYMBOL(class_connect);
1127
1128 /* if export is involved in recovery then clean up related things */
1129 void class_export_recovery_cleanup(struct obd_export *exp)
1130 {
1131         struct obd_device *obd = exp->exp_obd;
1132
1133         spin_lock(&obd->obd_recovery_task_lock);
1134         if (exp->exp_delayed)
1135                 obd->obd_delayed_clients--;
1136         if (obd->obd_recovering && exp->exp_in_recovery) {
1137                 spin_lock(&exp->exp_lock);
1138                 exp->exp_in_recovery = 0;
1139                 spin_unlock(&exp->exp_lock);
1140                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1141                 cfs_atomic_dec(&obd->obd_connected_clients);
1142         }
1143         spin_unlock(&obd->obd_recovery_task_lock);
1144         /** Cleanup req replay fields */
1145         if (exp->exp_req_replay_needed) {
1146                 spin_lock(&exp->exp_lock);
1147                 exp->exp_req_replay_needed = 0;
1148                 spin_unlock(&exp->exp_lock);
1149                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1150                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1151         }
1152         /** Cleanup lock replay data */
1153         if (exp->exp_lock_replay_needed) {
1154                 spin_lock(&exp->exp_lock);
1155                 exp->exp_lock_replay_needed = 0;
1156                 spin_unlock(&exp->exp_lock);
1157                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1158                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1159         }
1160 }
1161
1162 /* This function removes 1-3 references from the export:
1163  * 1 - for export pointer passed
1164  * and if disconnect really need
1165  * 2 - removing from hash
1166  * 3 - in client_unlink_export
1167  * The export pointer passed to this function can destroyed */
1168 int class_disconnect(struct obd_export *export)
1169 {
1170         int already_disconnected;
1171         ENTRY;
1172
1173         if (export == NULL) {
1174                 CWARN("attempting to free NULL export %p\n", export);
1175                 RETURN(-EINVAL);
1176         }
1177
1178         spin_lock(&export->exp_lock);
1179         already_disconnected = export->exp_disconnected;
1180         export->exp_disconnected = 1;
1181         spin_unlock(&export->exp_lock);
1182
1183         /* class_cleanup(), abort_recovery(), and class_fail_export()
1184          * all end up in here, and if any of them race we shouldn't
1185          * call extra class_export_puts(). */
1186         if (already_disconnected) {
1187                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1188                 GOTO(no_disconn, already_disconnected);
1189         }
1190
1191         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1192                export->exp_handle.h_cookie);
1193
1194         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1195                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1196                              &export->exp_connection->c_peer.nid,
1197                              &export->exp_nid_hash);
1198
1199         class_export_recovery_cleanup(export);
1200         class_unlink_export(export);
1201 no_disconn:
1202         class_export_put(export);
1203         RETURN(0);
1204 }
1205 EXPORT_SYMBOL(class_disconnect);
1206
1207 /* Return non-zero for a fully connected export */
1208 int class_connected_export(struct obd_export *exp)
1209 {
1210         if (exp) {
1211                 int connected;
1212                 spin_lock(&exp->exp_lock);
1213                 connected = (exp->exp_conn_cnt > 0);
1214                 spin_unlock(&exp->exp_lock);
1215                 return connected;
1216         }
1217         return 0;
1218 }
1219 EXPORT_SYMBOL(class_connected_export);
1220
1221 static void class_disconnect_export_list(cfs_list_t *list,
1222                                          enum obd_option flags)
1223 {
1224         int rc;
1225         struct obd_export *exp;
1226         ENTRY;
1227
1228         /* It's possible that an export may disconnect itself, but
1229          * nothing else will be added to this list. */
1230         while (!cfs_list_empty(list)) {
1231                 exp = cfs_list_entry(list->next, struct obd_export,
1232                                      exp_obd_chain);
1233                 /* need for safe call CDEBUG after obd_disconnect */
1234                 class_export_get(exp);
1235
1236                 spin_lock(&exp->exp_lock);
1237                 exp->exp_flags = flags;
1238                 spin_unlock(&exp->exp_lock);
1239
1240                 if (obd_uuid_equals(&exp->exp_client_uuid,
1241                                     &exp->exp_obd->obd_uuid)) {
1242                         CDEBUG(D_HA,
1243                                "exp %p export uuid == obd uuid, don't discon\n",
1244                                exp);
1245                         /* Need to delete this now so we don't end up pointing
1246                          * to work_list later when this export is cleaned up. */
1247                         cfs_list_del_init(&exp->exp_obd_chain);
1248                         class_export_put(exp);
1249                         continue;
1250                 }
1251
1252                 class_export_get(exp);
1253                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1254                        "last request at "CFS_TIME_T"\n",
1255                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1256                        exp, exp->exp_last_request_time);
1257                 /* release one export reference anyway */
1258                 rc = obd_disconnect(exp);
1259
1260                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1261                        obd_export_nid2str(exp), exp, rc);
1262                 class_export_put(exp);
1263         }
1264         EXIT;
1265 }
1266
1267 void class_disconnect_exports(struct obd_device *obd)
1268 {
1269         cfs_list_t work_list;
1270         ENTRY;
1271
1272         /* Move all of the exports from obd_exports to a work list, en masse. */
1273         CFS_INIT_LIST_HEAD(&work_list);
1274         spin_lock(&obd->obd_dev_lock);
1275         cfs_list_splice_init(&obd->obd_exports, &work_list);
1276         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1277         spin_unlock(&obd->obd_dev_lock);
1278
1279         if (!cfs_list_empty(&work_list)) {
1280                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1281                        "disconnecting them\n", obd->obd_minor, obd);
1282                 class_disconnect_export_list(&work_list,
1283                                              exp_flags_from_obd(obd));
1284         } else
1285                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1286                        obd->obd_minor, obd);
1287         EXIT;
1288 }
1289 EXPORT_SYMBOL(class_disconnect_exports);
1290
1291 /* Remove exports that have not completed recovery.
1292  */
1293 void class_disconnect_stale_exports(struct obd_device *obd,
1294                                     int (*test_export)(struct obd_export *))
1295 {
1296         cfs_list_t work_list;
1297         struct obd_export *exp, *n;
1298         int evicted = 0;
1299         ENTRY;
1300
1301         CFS_INIT_LIST_HEAD(&work_list);
1302         spin_lock(&obd->obd_dev_lock);
1303         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1304                                      exp_obd_chain) {
1305                 /* don't count self-export as client */
1306                 if (obd_uuid_equals(&exp->exp_client_uuid,
1307                                     &exp->exp_obd->obd_uuid))
1308                         continue;
1309
1310                 /* don't evict clients which have no slot in last_rcvd
1311                  * (e.g. lightweight connection) */
1312                 if (exp->exp_target_data.ted_lr_idx == -1)
1313                         continue;
1314
1315                 spin_lock(&exp->exp_lock);
1316                 if (test_export(exp)) {
1317                         spin_unlock(&exp->exp_lock);
1318                         continue;
1319                 }
1320                 exp->exp_failed = 1;
1321                 spin_unlock(&exp->exp_lock);
1322
1323                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1324                 evicted++;
1325                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1326                        obd->obd_name, exp->exp_client_uuid.uuid,
1327                        exp->exp_connection == NULL ? "<unknown>" :
1328                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1329                 print_export_data(exp, "EVICTING", 0);
1330         }
1331         spin_unlock(&obd->obd_dev_lock);
1332
1333         if (evicted) {
1334                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1335                               obd->obd_name, evicted);
1336                 obd->obd_stale_clients += evicted;
1337         }
1338         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1339                                                  OBD_OPT_ABORT_RECOV);
1340         EXIT;
1341 }
1342 EXPORT_SYMBOL(class_disconnect_stale_exports);
1343
1344 void class_fail_export(struct obd_export *exp)
1345 {
1346         int rc, already_failed;
1347
1348         spin_lock(&exp->exp_lock);
1349         already_failed = exp->exp_failed;
1350         exp->exp_failed = 1;
1351         spin_unlock(&exp->exp_lock);
1352
1353         if (already_failed) {
1354                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1355                        exp, exp->exp_client_uuid.uuid);
1356                 return;
1357         }
1358
1359         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1360                exp, exp->exp_client_uuid.uuid);
1361
1362         if (obd_dump_on_timeout)
1363                 libcfs_debug_dumplog();
1364
1365         /* need for safe call CDEBUG after obd_disconnect */
1366         class_export_get(exp);
1367
1368         /* Most callers into obd_disconnect are removing their own reference
1369          * (request, for example) in addition to the one from the hash table.
1370          * We don't have such a reference here, so make one. */
1371         class_export_get(exp);
1372         rc = obd_disconnect(exp);
1373         if (rc)
1374                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1375         else
1376                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1377                        exp, exp->exp_client_uuid.uuid);
1378         class_export_put(exp);
1379 }
1380 EXPORT_SYMBOL(class_fail_export);
1381
1382 char *obd_export_nid2str(struct obd_export *exp)
1383 {
1384         if (exp->exp_connection != NULL)
1385                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1386
1387         return "(no nid)";
1388 }
1389 EXPORT_SYMBOL(obd_export_nid2str);
1390
1391 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1392 {
1393         cfs_hash_t *nid_hash;
1394         struct obd_export *doomed_exp = NULL;
1395         int exports_evicted = 0;
1396
1397         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1398
1399         spin_lock(&obd->obd_dev_lock);
1400         /* umount has run already, so evict thread should leave
1401          * its task to umount thread now */
1402         if (obd->obd_stopping) {
1403                 spin_unlock(&obd->obd_dev_lock);
1404                 return exports_evicted;
1405         }
1406         nid_hash = obd->obd_nid_hash;
1407         cfs_hash_getref(nid_hash);
1408         spin_unlock(&obd->obd_dev_lock);
1409
1410         do {
1411                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1412                 if (doomed_exp == NULL)
1413                         break;
1414
1415                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1416                          "nid %s found, wanted nid %s, requested nid %s\n",
1417                          obd_export_nid2str(doomed_exp),
1418                          libcfs_nid2str(nid_key), nid);
1419                 LASSERTF(doomed_exp != obd->obd_self_export,
1420                          "self-export is hashed by NID?\n");
1421                 exports_evicted++;
1422                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1423                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1424                        exports_evicted);
1425                 class_fail_export(doomed_exp);
1426                 class_export_put(doomed_exp);
1427         } while (1);
1428
1429         cfs_hash_putref(nid_hash);
1430
1431         if (!exports_evicted)
1432                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1433                        obd->obd_name, nid);
1434         return exports_evicted;
1435 }
1436 EXPORT_SYMBOL(obd_export_evict_by_nid);
1437
1438 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1439 {
1440         cfs_hash_t *uuid_hash;
1441         struct obd_export *doomed_exp = NULL;
1442         struct obd_uuid doomed_uuid;
1443         int exports_evicted = 0;
1444
1445         spin_lock(&obd->obd_dev_lock);
1446         if (obd->obd_stopping) {
1447                 spin_unlock(&obd->obd_dev_lock);
1448                 return exports_evicted;
1449         }
1450         uuid_hash = obd->obd_uuid_hash;
1451         cfs_hash_getref(uuid_hash);
1452         spin_unlock(&obd->obd_dev_lock);
1453
1454         obd_str2uuid(&doomed_uuid, uuid);
1455         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1456                 CERROR("%s: can't evict myself\n", obd->obd_name);
1457                 cfs_hash_putref(uuid_hash);
1458                 return exports_evicted;
1459         }
1460
1461         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1462
1463         if (doomed_exp == NULL) {
1464                 CERROR("%s: can't disconnect %s: no exports found\n",
1465                        obd->obd_name, uuid);
1466         } else {
1467                 CWARN("%s: evicting %s at adminstrative request\n",
1468                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1469                 class_fail_export(doomed_exp);
1470                 class_export_put(doomed_exp);
1471                 exports_evicted++;
1472         }
1473         cfs_hash_putref(uuid_hash);
1474
1475         return exports_evicted;
1476 }
1477 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1478
1479 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1480 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1481 EXPORT_SYMBOL(class_export_dump_hook);
1482 #endif
1483
1484 static void print_export_data(struct obd_export *exp, const char *status,
1485                               int locks)
1486 {
1487         struct ptlrpc_reply_state *rs;
1488         struct ptlrpc_reply_state *first_reply = NULL;
1489         int nreplies = 0;
1490
1491         spin_lock(&exp->exp_lock);
1492         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1493                                 rs_exp_list) {
1494                 if (nreplies == 0)
1495                         first_reply = rs;
1496                 nreplies++;
1497         }
1498         spin_unlock(&exp->exp_lock);
1499
1500         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1501                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1502                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1503                cfs_atomic_read(&exp->exp_rpc_count),
1504                cfs_atomic_read(&exp->exp_cb_count),
1505                cfs_atomic_read(&exp->exp_locks_count),
1506                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1507                nreplies, first_reply, nreplies > 3 ? "..." : "",
1508                exp->exp_last_committed);
1509 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1510         if (locks && class_export_dump_hook != NULL)
1511                 class_export_dump_hook(exp);
1512 #endif
1513 }
1514
1515 void dump_exports(struct obd_device *obd, int locks)
1516 {
1517         struct obd_export *exp;
1518
1519         spin_lock(&obd->obd_dev_lock);
1520         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1521                 print_export_data(exp, "ACTIVE", locks);
1522         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1523                 print_export_data(exp, "UNLINKED", locks);
1524         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1525                 print_export_data(exp, "DELAYED", locks);
1526         spin_unlock(&obd->obd_dev_lock);
1527         spin_lock(&obd_zombie_impexp_lock);
1528         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1529                 print_export_data(exp, "ZOMBIE", locks);
1530         spin_unlock(&obd_zombie_impexp_lock);
1531 }
1532 EXPORT_SYMBOL(dump_exports);
1533
1534 void obd_exports_barrier(struct obd_device *obd)
1535 {
1536         int waited = 2;
1537         LASSERT(cfs_list_empty(&obd->obd_exports));
1538         spin_lock(&obd->obd_dev_lock);
1539         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1540                 spin_unlock(&obd->obd_dev_lock);
1541                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1542                                                    cfs_time_seconds(waited));
1543                 if (waited > 5 && IS_PO2(waited)) {
1544                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1545                                       "more than %d seconds. "
1546                                       "The obd refcount = %d. Is it stuck?\n",
1547                                       obd->obd_name, waited,
1548                                       cfs_atomic_read(&obd->obd_refcount));
1549                         dump_exports(obd, 1);
1550                 }
1551                 waited *= 2;
1552                 spin_lock(&obd->obd_dev_lock);
1553         }
1554         spin_unlock(&obd->obd_dev_lock);
1555 }
1556 EXPORT_SYMBOL(obd_exports_barrier);
1557
1558 /* Total amount of zombies to be destroyed */
1559 static int zombies_count = 0;
1560
1561 /**
1562  * kill zombie imports and exports
1563  */
1564 void obd_zombie_impexp_cull(void)
1565 {
1566         struct obd_import *import;
1567         struct obd_export *export;
1568         ENTRY;
1569
1570         do {
1571                 spin_lock(&obd_zombie_impexp_lock);
1572
1573                 import = NULL;
1574                 if (!cfs_list_empty(&obd_zombie_imports)) {
1575                         import = cfs_list_entry(obd_zombie_imports.next,
1576                                                 struct obd_import,
1577                                                 imp_zombie_chain);
1578                         cfs_list_del_init(&import->imp_zombie_chain);
1579                 }
1580
1581                 export = NULL;
1582                 if (!cfs_list_empty(&obd_zombie_exports)) {
1583                         export = cfs_list_entry(obd_zombie_exports.next,
1584                                                 struct obd_export,
1585                                                 exp_obd_chain);
1586                         cfs_list_del_init(&export->exp_obd_chain);
1587                 }
1588
1589                 spin_unlock(&obd_zombie_impexp_lock);
1590
1591                 if (import != NULL) {
1592                         class_import_destroy(import);
1593                         spin_lock(&obd_zombie_impexp_lock);
1594                         zombies_count--;
1595                         spin_unlock(&obd_zombie_impexp_lock);
1596                 }
1597
1598                 if (export != NULL) {
1599                         class_export_destroy(export);
1600                         spin_lock(&obd_zombie_impexp_lock);
1601                         zombies_count--;
1602                         spin_unlock(&obd_zombie_impexp_lock);
1603                 }
1604
1605                 cfs_cond_resched();
1606         } while (import != NULL || export != NULL);
1607         EXIT;
1608 }
1609
1610 static struct completion        obd_zombie_start;
1611 static struct completion        obd_zombie_stop;
1612 static unsigned long            obd_zombie_flags;
1613 static cfs_waitq_t              obd_zombie_waitq;
1614 static pid_t                    obd_zombie_pid;
1615
1616 enum {
1617         OBD_ZOMBIE_STOP         = 0x0001,
1618 };
1619
1620 /**
1621  * check for work for kill zombie import/export thread.
1622  */
1623 static int obd_zombie_impexp_check(void *arg)
1624 {
1625         int rc;
1626
1627         spin_lock(&obd_zombie_impexp_lock);
1628         rc = (zombies_count == 0) &&
1629              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1630         spin_unlock(&obd_zombie_impexp_lock);
1631
1632         RETURN(rc);
1633 }
1634
1635 /**
1636  * Add export to the obd_zombe thread and notify it.
1637  */
1638 static void obd_zombie_export_add(struct obd_export *exp) {
1639         spin_lock(&exp->exp_obd->obd_dev_lock);
1640         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1641         cfs_list_del_init(&exp->exp_obd_chain);
1642         spin_unlock(&exp->exp_obd->obd_dev_lock);
1643         spin_lock(&obd_zombie_impexp_lock);
1644         zombies_count++;
1645         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1646         spin_unlock(&obd_zombie_impexp_lock);
1647
1648         obd_zombie_impexp_notify();
1649 }
1650
1651 /**
1652  * Add import to the obd_zombe thread and notify it.
1653  */
1654 static void obd_zombie_import_add(struct obd_import *imp) {
1655         LASSERT(imp->imp_sec == NULL);
1656         LASSERT(imp->imp_rq_pool == NULL);
1657         spin_lock(&obd_zombie_impexp_lock);
1658         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1659         zombies_count++;
1660         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1661         spin_unlock(&obd_zombie_impexp_lock);
1662
1663         obd_zombie_impexp_notify();
1664 }
1665
1666 /**
1667  * notify import/export destroy thread about new zombie.
1668  */
1669 static void obd_zombie_impexp_notify(void)
1670 {
1671         /*
1672          * Make sure obd_zomebie_impexp_thread get this notification.
1673          * It is possible this signal only get by obd_zombie_barrier, and
1674          * barrier gulps this notification and sleeps away and hangs ensues
1675          */
1676         cfs_waitq_broadcast(&obd_zombie_waitq);
1677 }
1678
1679 /**
1680  * check whether obd_zombie is idle
1681  */
1682 static int obd_zombie_is_idle(void)
1683 {
1684         int rc;
1685
1686         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1687         spin_lock(&obd_zombie_impexp_lock);
1688         rc = (zombies_count == 0);
1689         spin_unlock(&obd_zombie_impexp_lock);
1690         return rc;
1691 }
1692
1693 /**
1694  * wait when obd_zombie import/export queues become empty
1695  */
1696 void obd_zombie_barrier(void)
1697 {
1698         struct l_wait_info lwi = { 0 };
1699
1700         if (obd_zombie_pid == cfs_curproc_pid())
1701                 /* don't wait for myself */
1702                 return;
1703         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1704 }
1705 EXPORT_SYMBOL(obd_zombie_barrier);
1706
1707 #ifdef __KERNEL__
1708
1709 /**
1710  * destroy zombie export/import thread.
1711  */
1712 static int obd_zombie_impexp_thread(void *unused)
1713 {
1714         int rc;
1715
1716         rc = cfs_daemonize_ctxt("obd_zombid");
1717         if (rc != 0) {
1718                 complete(&obd_zombie_start);
1719                 RETURN(rc);
1720         }
1721
1722         complete(&obd_zombie_start);
1723
1724         obd_zombie_pid = cfs_curproc_pid();
1725
1726         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1727                 struct l_wait_info lwi = { 0 };
1728
1729                 l_wait_event(obd_zombie_waitq,
1730                              !obd_zombie_impexp_check(NULL), &lwi);
1731                 obd_zombie_impexp_cull();
1732
1733                 /*
1734                  * Notify obd_zombie_barrier callers that queues
1735                  * may be empty.
1736                  */
1737                 cfs_waitq_signal(&obd_zombie_waitq);
1738         }
1739
1740         complete(&obd_zombie_stop);
1741
1742         RETURN(0);
1743 }
1744
1745 #else /* ! KERNEL */
1746
1747 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1748 static void *obd_zombie_impexp_work_cb;
1749 static void *obd_zombie_impexp_idle_cb;
1750
1751 int obd_zombie_impexp_kill(void *arg)
1752 {
1753         int rc = 0;
1754
1755         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1756                 obd_zombie_impexp_cull();
1757                 rc = 1;
1758         }
1759         cfs_atomic_dec(&zombie_recur);
1760         return rc;
1761 }
1762
1763 #endif
1764
1765 /**
1766  * start destroy zombie import/export thread
1767  */
1768 int obd_zombie_impexp_init(void)
1769 {
1770         int rc;
1771
1772         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1773         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1774         spin_lock_init(&obd_zombie_impexp_lock);
1775         init_completion(&obd_zombie_start);
1776         init_completion(&obd_zombie_stop);
1777         cfs_waitq_init(&obd_zombie_waitq);
1778         obd_zombie_pid = 0;
1779
1780 #ifdef __KERNEL__
1781         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1782         if (rc < 0)
1783                 RETURN(rc);
1784
1785         wait_for_completion(&obd_zombie_start);
1786 #else
1787
1788         obd_zombie_impexp_work_cb =
1789                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1790                                                  &obd_zombie_impexp_kill, NULL);
1791
1792         obd_zombie_impexp_idle_cb =
1793                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1794                                                  &obd_zombie_impexp_check, NULL);
1795         rc = 0;
1796 #endif
1797         RETURN(rc);
1798 }
1799 /**
1800  * stop destroy zombie import/export thread
1801  */
1802 void obd_zombie_impexp_stop(void)
1803 {
1804         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1805         obd_zombie_impexp_notify();
1806 #ifdef __KERNEL__
1807         wait_for_completion(&obd_zombie_stop);
1808 #else
1809         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1810         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1811 #endif
1812 }
1813
1814 /***** Kernel-userspace comm helpers *******/
1815
1816 /* Get length of entire message, including header */
1817 int kuc_len(int payload_len)
1818 {
1819         return sizeof(struct kuc_hdr) + payload_len;
1820 }
1821 EXPORT_SYMBOL(kuc_len);
1822
1823 /* Get a pointer to kuc header, given a ptr to the payload
1824  * @param p Pointer to payload area
1825  * @returns Pointer to kuc header
1826  */
1827 struct kuc_hdr * kuc_ptr(void *p)
1828 {
1829         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1830         LASSERT(lh->kuc_magic == KUC_MAGIC);
1831         return lh;
1832 }
1833 EXPORT_SYMBOL(kuc_ptr);
1834
1835 /* Test if payload is part of kuc message
1836  * @param p Pointer to payload area
1837  * @returns boolean
1838  */
1839 int kuc_ispayload(void *p)
1840 {
1841         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1842
1843         if (kh->kuc_magic == KUC_MAGIC)
1844                 return 1;
1845         else
1846                 return 0;
1847 }
1848 EXPORT_SYMBOL(kuc_ispayload);
1849
1850 /* Alloc space for a message, and fill in header
1851  * @return Pointer to payload area
1852  */
1853 void *kuc_alloc(int payload_len, int transport, int type)
1854 {
1855         struct kuc_hdr *lh;
1856         int len = kuc_len(payload_len);
1857
1858         OBD_ALLOC(lh, len);
1859         if (lh == NULL)
1860                 return ERR_PTR(-ENOMEM);
1861
1862         lh->kuc_magic = KUC_MAGIC;
1863         lh->kuc_transport = transport;
1864         lh->kuc_msgtype = type;
1865         lh->kuc_msglen = len;
1866
1867         return (void *)(lh + 1);
1868 }
1869 EXPORT_SYMBOL(kuc_alloc);
1870
1871 /* Takes pointer to payload area */
1872 inline void kuc_free(void *p, int payload_len)
1873 {
1874         struct kuc_hdr *lh = kuc_ptr(p);
1875         OBD_FREE(lh, kuc_len(payload_len));
1876 }
1877 EXPORT_SYMBOL(kuc_free);
1878
1879
1880