Whamcloud - gitweb
a48f887e7ca6fbb31ced57a8ee435592960503ed
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/kthread.h>
41 #include <obd_class.h>
42 #include <lprocfs_status.h>
43 #include <lustre_disk.h>
44 #include <lustre_kernelcomm.h>
45
46 spinlock_t obd_types_lock;
47
48 static struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
52
53 static struct list_head obd_zombie_imports;
54 static struct list_head obd_zombie_exports;
55 static spinlock_t  obd_zombie_impexp_lock;
56
57 static void obd_zombie_impexp_notify(void);
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61                               const char *status, int locks, int debug_level);
62
63 struct list_head obd_stale_exports;
64 spinlock_t       obd_stale_export_lock;
65 atomic_t         obd_stale_export_num;
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         struct list_head *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         list_for_each(tmp, &obd_types) {
106                 type = list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129                         modname = LUSTRE_OSP_NAME;
130
131                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132                         modname = LUSTRE_MDT_NAME;
133
134                 if (!request_module("%s", modname)) {
135                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136                         type = class_search_type(name);
137                 } else {
138                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139                                            modname);
140                 }
141         }
142 #endif
143         if (type) {
144                 spin_lock(&type->obd_type_lock);
145                 type->typ_refcnt++;
146                 try_module_get(type->typ_dt_ops->o_owner);
147                 spin_unlock(&type->obd_type_lock);
148         }
149         return type;
150 }
151
152 void class_put_type(struct obd_type *type)
153 {
154         LASSERT(type);
155         spin_lock(&type->obd_type_lock);
156         type->typ_refcnt--;
157         module_put(type->typ_dt_ops->o_owner);
158         spin_unlock(&type->obd_type_lock);
159 }
160
161 #define CLASS_MAX_NAME 1024
162
163 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
164                         bool enable_proc, struct lprocfs_vars *vars,
165                         const char *name, struct lu_device_type *ldt)
166 {
167         struct obd_type *type;
168         int rc = 0;
169         ENTRY;
170
171         /* sanity check */
172         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
173
174         if (class_search_type(name)) {
175                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
176                 RETURN(-EEXIST);
177         }
178
179         rc = -ENOMEM;
180         OBD_ALLOC(type, sizeof(*type));
181         if (type == NULL)
182                 RETURN(rc);
183
184         OBD_ALLOC_PTR(type->typ_dt_ops);
185         OBD_ALLOC_PTR(type->typ_md_ops);
186         OBD_ALLOC(type->typ_name, strlen(name) + 1);
187
188         if (type->typ_dt_ops == NULL ||
189             type->typ_md_ops == NULL ||
190             type->typ_name == NULL)
191                 GOTO (failed, rc);
192
193         *(type->typ_dt_ops) = *dt_ops;
194         /* md_ops is optional */
195         if (md_ops)
196                 *(type->typ_md_ops) = *md_ops;
197         strcpy(type->typ_name, name);
198         spin_lock_init(&type->obd_type_lock);
199
200 #ifdef CONFIG_PROC_FS
201         if (enable_proc) {
202                 type->typ_procroot = lprocfs_register(type->typ_name,
203                                                       proc_lustre_root,
204                                                       vars, type);
205                 if (IS_ERR(type->typ_procroot)) {
206                         rc = PTR_ERR(type->typ_procroot);
207                         type->typ_procroot = NULL;
208                         GOTO(failed, rc);
209                 }
210         }
211 #endif
212         if (ldt != NULL) {
213                 type->typ_lu = ldt;
214                 rc = lu_device_type_init(ldt);
215                 if (rc != 0)
216                         GOTO (failed, rc);
217         }
218
219         spin_lock(&obd_types_lock);
220         list_add(&type->typ_chain, &obd_types);
221         spin_unlock(&obd_types_lock);
222
223         RETURN (0);
224
225 failed:
226         if (type->typ_name != NULL) {
227 #ifdef CONFIG_PROC_FS
228                 if (type->typ_procroot != NULL)
229                         remove_proc_subtree(type->typ_name, proc_lustre_root);
230 #endif
231                 OBD_FREE(type->typ_name, strlen(name) + 1);
232         }
233         if (type->typ_md_ops != NULL)
234                 OBD_FREE_PTR(type->typ_md_ops);
235         if (type->typ_dt_ops != NULL)
236                 OBD_FREE_PTR(type->typ_dt_ops);
237         OBD_FREE(type, sizeof(*type));
238         RETURN(rc);
239 }
240 EXPORT_SYMBOL(class_register_type);
241
242 int class_unregister_type(const char *name)
243 {
244         struct obd_type *type = class_search_type(name);
245         ENTRY;
246
247         if (!type) {
248                 CERROR("unknown obd type\n");
249                 RETURN(-EINVAL);
250         }
251
252         if (type->typ_refcnt) {
253                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
254                 /* This is a bad situation, let's make the best of it */
255                 /* Remove ops, but leave the name for debugging */
256                 OBD_FREE_PTR(type->typ_dt_ops);
257                 OBD_FREE_PTR(type->typ_md_ops);
258                 RETURN(-EBUSY);
259         }
260
261         /* we do not use type->typ_procroot as for compatibility purposes
262          * other modules can share names (i.e. lod can use lov entry). so
263          * we can't reference pointer as it can get invalided when another
264          * module removes the entry */
265 #ifdef CONFIG_PROC_FS
266         if (type->typ_procroot != NULL)
267                 remove_proc_subtree(type->typ_name, proc_lustre_root);
268         if (type->typ_procsym != NULL)
269                 lprocfs_remove(&type->typ_procsym);
270 #endif
271         if (type->typ_lu)
272                 lu_device_type_fini(type->typ_lu);
273
274         spin_lock(&obd_types_lock);
275         list_del(&type->typ_chain);
276         spin_unlock(&obd_types_lock);
277         OBD_FREE(type->typ_name, strlen(name) + 1);
278         if (type->typ_dt_ops != NULL)
279                 OBD_FREE_PTR(type->typ_dt_ops);
280         if (type->typ_md_ops != NULL)
281                 OBD_FREE_PTR(type->typ_md_ops);
282         OBD_FREE(type, sizeof(*type));
283         RETURN(0);
284 } /* class_unregister_type */
285 EXPORT_SYMBOL(class_unregister_type);
286
287 /**
288  * Create a new obd device.
289  *
290  * Find an empty slot in ::obd_devs[], create a new obd device in it.
291  *
292  * \param[in] type_name obd device type string.
293  * \param[in] name      obd device name.
294  *
295  * \retval NULL if create fails, otherwise return the obd device
296  *         pointer created.
297  */
298 struct obd_device *class_newdev(const char *type_name, const char *name)
299 {
300         struct obd_device *result = NULL;
301         struct obd_device *newdev;
302         struct obd_type *type = NULL;
303         int i;
304         int new_obd_minor = 0;
305         ENTRY;
306
307         if (strlen(name) >= MAX_OBD_NAME) {
308                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
309                 RETURN(ERR_PTR(-EINVAL));
310         }
311
312         type = class_get_type(type_name);
313         if (type == NULL){
314                 CERROR("OBD: unknown type: %s\n", type_name);
315                 RETURN(ERR_PTR(-ENODEV));
316         }
317
318         newdev = obd_device_alloc();
319         if (newdev == NULL)
320                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
321
322         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
323
324         write_lock(&obd_dev_lock);
325         for (i = 0; i < class_devno_max(); i++) {
326                 struct obd_device *obd = class_num2obd(i);
327
328                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
329                         CERROR("Device %s already exists at %d, won't add\n",
330                                name, i);
331                         if (result) {
332                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
333                                          "%p obd_magic %08x != %08x\n", result,
334                                          result->obd_magic, OBD_DEVICE_MAGIC);
335                                 LASSERTF(result->obd_minor == new_obd_minor,
336                                          "%p obd_minor %d != %d\n", result,
337                                          result->obd_minor, new_obd_minor);
338
339                                 obd_devs[result->obd_minor] = NULL;
340                                 result->obd_name[0]='\0';
341                          }
342                         result = ERR_PTR(-EEXIST);
343                         break;
344                 }
345                 if (!result && !obd) {
346                         result = newdev;
347                         result->obd_minor = i;
348                         new_obd_minor = i;
349                         result->obd_type = type;
350                         strncpy(result->obd_name, name,
351                                 sizeof(result->obd_name) - 1);
352                         obd_devs[i] = result;
353                 }
354         }
355         write_unlock(&obd_dev_lock);
356
357         if (result == NULL && i >= class_devno_max()) {
358                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
359                        class_devno_max());
360                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
361         }
362
363         if (IS_ERR(result))
364                 GOTO(out, result);
365
366         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
367                result->obd_name, result);
368
369         RETURN(result);
370 out:
371         obd_device_free(newdev);
372 out_type:
373         class_put_type(type);
374         return result;
375 }
376
377 void class_release_dev(struct obd_device *obd)
378 {
379         struct obd_type *obd_type = obd->obd_type;
380
381         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
382                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
383         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
384                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
385         LASSERT(obd_type != NULL);
386
387         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
388                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
389
390         write_lock(&obd_dev_lock);
391         obd_devs[obd->obd_minor] = NULL;
392         write_unlock(&obd_dev_lock);
393         obd_device_free(obd);
394
395         class_put_type(obd_type);
396 }
397
398 int class_name2dev(const char *name)
399 {
400         int i;
401
402         if (!name)
403                 return -1;
404
405         read_lock(&obd_dev_lock);
406         for (i = 0; i < class_devno_max(); i++) {
407                 struct obd_device *obd = class_num2obd(i);
408
409                 if (obd && strcmp(name, obd->obd_name) == 0) {
410                         /* Make sure we finished attaching before we give
411                            out any references */
412                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
413                         if (obd->obd_attached) {
414                                 read_unlock(&obd_dev_lock);
415                                 return i;
416                         }
417                         break;
418                 }
419         }
420         read_unlock(&obd_dev_lock);
421
422         return -1;
423 }
424
425 struct obd_device *class_name2obd(const char *name)
426 {
427         int dev = class_name2dev(name);
428
429         if (dev < 0 || dev > class_devno_max())
430                 return NULL;
431         return class_num2obd(dev);
432 }
433 EXPORT_SYMBOL(class_name2obd);
434
435 int class_uuid2dev(struct obd_uuid *uuid)
436 {
437         int i;
438
439         read_lock(&obd_dev_lock);
440         for (i = 0; i < class_devno_max(); i++) {
441                 struct obd_device *obd = class_num2obd(i);
442
443                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
444                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
445                         read_unlock(&obd_dev_lock);
446                         return i;
447                 }
448         }
449         read_unlock(&obd_dev_lock);
450
451         return -1;
452 }
453
454 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
455 {
456         int dev = class_uuid2dev(uuid);
457         if (dev < 0)
458                 return NULL;
459         return class_num2obd(dev);
460 }
461 EXPORT_SYMBOL(class_uuid2obd);
462
463 /**
464  * Get obd device from ::obd_devs[]
465  *
466  * \param num [in] array index
467  *
468  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
469  *         otherwise return the obd device there.
470  */
471 struct obd_device *class_num2obd(int num)
472 {
473         struct obd_device *obd = NULL;
474
475         if (num < class_devno_max()) {
476                 obd = obd_devs[num];
477                 if (obd == NULL)
478                         return NULL;
479
480                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
481                          "%p obd_magic %08x != %08x\n",
482                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
483                 LASSERTF(obd->obd_minor == num,
484                          "%p obd_minor %0d != %0d\n",
485                          obd, obd->obd_minor, num);
486         }
487
488         return obd;
489 }
490
491 /**
492  * Get obd devices count. Device in any
493  *    state are counted
494  * \retval obd device count
495  */
496 int get_devices_count(void)
497 {
498         int index, max_index = class_devno_max(), dev_count = 0;
499
500         read_lock(&obd_dev_lock);
501         for (index = 0; index <= max_index; index++) {
502                 struct obd_device *obd = class_num2obd(index);
503                 if (obd != NULL)
504                         dev_count++;
505         }
506         read_unlock(&obd_dev_lock);
507
508         return dev_count;
509 }
510 EXPORT_SYMBOL(get_devices_count);
511
512 void class_obd_list(void)
513 {
514         char *status;
515         int i;
516
517         read_lock(&obd_dev_lock);
518         for (i = 0; i < class_devno_max(); i++) {
519                 struct obd_device *obd = class_num2obd(i);
520
521                 if (obd == NULL)
522                         continue;
523                 if (obd->obd_stopping)
524                         status = "ST";
525                 else if (obd->obd_set_up)
526                         status = "UP";
527                 else if (obd->obd_attached)
528                         status = "AT";
529                 else
530                         status = "--";
531                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
532                          i, status, obd->obd_type->typ_name,
533                          obd->obd_name, obd->obd_uuid.uuid,
534                          atomic_read(&obd->obd_refcount));
535         }
536         read_unlock(&obd_dev_lock);
537         return;
538 }
539
540 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
541    specified, then only the client with that uuid is returned,
542    otherwise any client connected to the tgt is returned. */
543 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
544                                           const char * typ_name,
545                                           struct obd_uuid *grp_uuid)
546 {
547         int i;
548
549         read_lock(&obd_dev_lock);
550         for (i = 0; i < class_devno_max(); i++) {
551                 struct obd_device *obd = class_num2obd(i);
552
553                 if (obd == NULL)
554                         continue;
555                 if ((strncmp(obd->obd_type->typ_name, typ_name,
556                              strlen(typ_name)) == 0)) {
557                         if (obd_uuid_equals(tgt_uuid,
558                                             &obd->u.cli.cl_target_uuid) &&
559                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
560                                                          &obd->obd_uuid) : 1)) {
561                                 read_unlock(&obd_dev_lock);
562                                 return obd;
563                         }
564                 }
565         }
566         read_unlock(&obd_dev_lock);
567
568         return NULL;
569 }
570 EXPORT_SYMBOL(class_find_client_obd);
571
572 /* Iterate the obd_device list looking devices have grp_uuid. Start
573    searching at *next, and if a device is found, the next index to look
574    at is saved in *next. If next is NULL, then the first matching device
575    will always be returned. */
576 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
577 {
578         int i;
579
580         if (next == NULL)
581                 i = 0;
582         else if (*next >= 0 && *next < class_devno_max())
583                 i = *next;
584         else
585                 return NULL;
586
587         read_lock(&obd_dev_lock);
588         for (; i < class_devno_max(); i++) {
589                 struct obd_device *obd = class_num2obd(i);
590
591                 if (obd == NULL)
592                         continue;
593                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
594                         if (next != NULL)
595                                 *next = i+1;
596                         read_unlock(&obd_dev_lock);
597                         return obd;
598                 }
599         }
600         read_unlock(&obd_dev_lock);
601
602         return NULL;
603 }
604 EXPORT_SYMBOL(class_devices_in_group);
605
606 /**
607  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
608  * adjust sptlrpc settings accordingly.
609  */
610 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
611 {
612         struct obd_device  *obd;
613         const char         *type;
614         int                 i, rc = 0, rc2;
615
616         LASSERT(namelen > 0);
617
618         read_lock(&obd_dev_lock);
619         for (i = 0; i < class_devno_max(); i++) {
620                 obd = class_num2obd(i);
621
622                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
623                         continue;
624
625                 /* only notify mdc, osc, osp, lwp, mdt, ost
626                  * because only these have a -sptlrpc llog */
627                 type = obd->obd_type->typ_name;
628                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
629                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
630                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
631                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
632                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
633                     strcmp(type, LUSTRE_OST_NAME) != 0)
634                         continue;
635
636                 if (strncmp(obd->obd_name, fsname, namelen))
637                         continue;
638
639                 class_incref(obd, __FUNCTION__, obd);
640                 read_unlock(&obd_dev_lock);
641                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
642                                          sizeof(KEY_SPTLRPC_CONF),
643                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
644                 rc = rc ? rc : rc2;
645                 class_decref(obd, __FUNCTION__, obd);
646                 read_lock(&obd_dev_lock);
647         }
648         read_unlock(&obd_dev_lock);
649         return rc;
650 }
651 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
652
653 void obd_cleanup_caches(void)
654 {
655         ENTRY;
656         if (obd_device_cachep) {
657                 kmem_cache_destroy(obd_device_cachep);
658                 obd_device_cachep = NULL;
659         }
660         if (obdo_cachep) {
661                 kmem_cache_destroy(obdo_cachep);
662                 obdo_cachep = NULL;
663         }
664         if (import_cachep) {
665                 kmem_cache_destroy(import_cachep);
666                 import_cachep = NULL;
667         }
668
669         EXIT;
670 }
671
672 int obd_init_caches(void)
673 {
674         int rc;
675         ENTRY;
676
677         LASSERT(obd_device_cachep == NULL);
678         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
679                                               sizeof(struct obd_device),
680                                               0, 0, NULL);
681         if (!obd_device_cachep)
682                 GOTO(out, rc = -ENOMEM);
683
684         LASSERT(obdo_cachep == NULL);
685         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
686                                         0, 0, NULL);
687         if (!obdo_cachep)
688                 GOTO(out, rc = -ENOMEM);
689
690         LASSERT(import_cachep == NULL);
691         import_cachep = kmem_cache_create("ll_import_cache",
692                                           sizeof(struct obd_import),
693                                           0, 0, NULL);
694         if (!import_cachep)
695                 GOTO(out, rc = -ENOMEM);
696
697         RETURN(0);
698 out:
699         obd_cleanup_caches();
700         RETURN(rc);
701 }
702
703 /* map connection to client */
704 struct obd_export *class_conn2export(struct lustre_handle *conn)
705 {
706         struct obd_export *export;
707         ENTRY;
708
709         if (!conn) {
710                 CDEBUG(D_CACHE, "looking for null handle\n");
711                 RETURN(NULL);
712         }
713
714         if (conn->cookie == -1) {  /* this means assign a new connection */
715                 CDEBUG(D_CACHE, "want a new connection\n");
716                 RETURN(NULL);
717         }
718
719         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
720         export = class_handle2object(conn->cookie, NULL);
721         RETURN(export);
722 }
723 EXPORT_SYMBOL(class_conn2export);
724
725 struct obd_device *class_exp2obd(struct obd_export *exp)
726 {
727         if (exp)
728                 return exp->exp_obd;
729         return NULL;
730 }
731 EXPORT_SYMBOL(class_exp2obd);
732
733 struct obd_device *class_conn2obd(struct lustre_handle *conn)
734 {
735         struct obd_export *export;
736         export = class_conn2export(conn);
737         if (export) {
738                 struct obd_device *obd = export->exp_obd;
739                 class_export_put(export);
740                 return obd;
741         }
742         return NULL;
743 }
744
745 struct obd_import *class_exp2cliimp(struct obd_export *exp)
746 {
747         struct obd_device *obd = exp->exp_obd;
748         if (obd == NULL)
749                 return NULL;
750         return obd->u.cli.cl_import;
751 }
752 EXPORT_SYMBOL(class_exp2cliimp);
753
754 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
755 {
756         struct obd_device *obd = class_conn2obd(conn);
757         if (obd == NULL)
758                 return NULL;
759         return obd->u.cli.cl_import;
760 }
761
762 /* Export management functions */
763 static void class_export_destroy(struct obd_export *exp)
764 {
765         struct obd_device *obd = exp->exp_obd;
766         ENTRY;
767
768         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
769         LASSERT(obd != NULL);
770
771         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
772                exp->exp_client_uuid.uuid, obd->obd_name);
773
774         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
775         if (exp->exp_connection)
776                 ptlrpc_put_connection_superhack(exp->exp_connection);
777
778         LASSERT(list_empty(&exp->exp_outstanding_replies));
779         LASSERT(list_empty(&exp->exp_uncommitted_replies));
780         LASSERT(list_empty(&exp->exp_req_replay_queue));
781         LASSERT(list_empty(&exp->exp_hp_rpcs));
782         obd_destroy_export(exp);
783         class_decref(obd, "export", exp);
784
785         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
786         EXIT;
787 }
788
789 static void export_handle_addref(void *export)
790 {
791         class_export_get(export);
792 }
793
794 static struct portals_handle_ops export_handle_ops = {
795         .hop_addref = export_handle_addref,
796         .hop_free   = NULL,
797 };
798
799 struct obd_export *class_export_get(struct obd_export *exp)
800 {
801         atomic_inc(&exp->exp_refcount);
802         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
803                atomic_read(&exp->exp_refcount));
804         return exp;
805 }
806 EXPORT_SYMBOL(class_export_get);
807
808 void class_export_put(struct obd_export *exp)
809 {
810         LASSERT(exp != NULL);
811         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
812         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
813                atomic_read(&exp->exp_refcount) - 1);
814
815         if (atomic_dec_and_test(&exp->exp_refcount)) {
816                 LASSERT(!list_empty(&exp->exp_obd_chain));
817                 LASSERT(list_empty(&exp->exp_stale_list));
818                 CDEBUG(D_IOCTL, "final put %p/%s\n",
819                        exp, exp->exp_client_uuid.uuid);
820
821                 /* release nid stat refererence */
822                 lprocfs_exp_cleanup(exp);
823
824                 obd_zombie_export_add(exp);
825         }
826 }
827 EXPORT_SYMBOL(class_export_put);
828
829 /* Creates a new export, adds it to the hash table, and returns a
830  * pointer to it. The refcount is 2: one for the hash reference, and
831  * one for the pointer returned by this function. */
832 struct obd_export *class_new_export(struct obd_device *obd,
833                                     struct obd_uuid *cluuid)
834 {
835         struct obd_export *export;
836         struct cfs_hash *hash = NULL;
837         int rc = 0;
838         ENTRY;
839
840         OBD_ALLOC_PTR(export);
841         if (!export)
842                 return ERR_PTR(-ENOMEM);
843
844         export->exp_conn_cnt = 0;
845         export->exp_lock_hash = NULL;
846         export->exp_flock_hash = NULL;
847         atomic_set(&export->exp_refcount, 2);
848         atomic_set(&export->exp_rpc_count, 0);
849         atomic_set(&export->exp_cb_count, 0);
850         atomic_set(&export->exp_locks_count, 0);
851 #if LUSTRE_TRACKS_LOCK_EXP_REFS
852         INIT_LIST_HEAD(&export->exp_locks_list);
853         spin_lock_init(&export->exp_locks_list_guard);
854 #endif
855         atomic_set(&export->exp_replay_count, 0);
856         export->exp_obd = obd;
857         INIT_LIST_HEAD(&export->exp_outstanding_replies);
858         spin_lock_init(&export->exp_uncommitted_replies_lock);
859         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
860         INIT_LIST_HEAD(&export->exp_req_replay_queue);
861         INIT_LIST_HEAD(&export->exp_handle.h_link);
862         INIT_LIST_HEAD(&export->exp_hp_rpcs);
863         INIT_LIST_HEAD(&export->exp_reg_rpcs);
864         class_handle_hash(&export->exp_handle, &export_handle_ops);
865         export->exp_last_request_time = cfs_time_current_sec();
866         spin_lock_init(&export->exp_lock);
867         spin_lock_init(&export->exp_rpc_lock);
868         INIT_HLIST_NODE(&export->exp_uuid_hash);
869         INIT_HLIST_NODE(&export->exp_nid_hash);
870         INIT_HLIST_NODE(&export->exp_gen_hash);
871         spin_lock_init(&export->exp_bl_list_lock);
872         INIT_LIST_HEAD(&export->exp_bl_list);
873         INIT_LIST_HEAD(&export->exp_stale_list);
874
875         export->exp_sp_peer = LUSTRE_SP_ANY;
876         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
877         export->exp_client_uuid = *cluuid;
878         obd_init_export(export);
879
880         spin_lock(&obd->obd_dev_lock);
881         /* shouldn't happen, but might race */
882         if (obd->obd_stopping)
883                 GOTO(exit_unlock, rc = -ENODEV);
884
885         hash = cfs_hash_getref(obd->obd_uuid_hash);
886         if (hash == NULL)
887                 GOTO(exit_unlock, rc = -ENODEV);
888         spin_unlock(&obd->obd_dev_lock);
889
890         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
891                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
892                 if (rc != 0) {
893                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
894                                       obd->obd_name, cluuid->uuid, rc);
895                         GOTO(exit_err, rc = -EALREADY);
896                 }
897         }
898
899         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
900         spin_lock(&obd->obd_dev_lock);
901         if (obd->obd_stopping) {
902                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
903                 GOTO(exit_unlock, rc = -ENODEV);
904         }
905
906         class_incref(obd, "export", export);
907         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
908         list_add_tail(&export->exp_obd_chain_timed,
909                       &export->exp_obd->obd_exports_timed);
910         export->exp_obd->obd_num_exports++;
911         spin_unlock(&obd->obd_dev_lock);
912         cfs_hash_putref(hash);
913         RETURN(export);
914
915 exit_unlock:
916         spin_unlock(&obd->obd_dev_lock);
917 exit_err:
918         if (hash)
919                 cfs_hash_putref(hash);
920         class_handle_unhash(&export->exp_handle);
921         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
922         obd_destroy_export(export);
923         OBD_FREE_PTR(export);
924         return ERR_PTR(rc);
925 }
926 EXPORT_SYMBOL(class_new_export);
927
928 void class_unlink_export(struct obd_export *exp)
929 {
930         class_handle_unhash(&exp->exp_handle);
931
932         spin_lock(&exp->exp_obd->obd_dev_lock);
933         /* delete an uuid-export hashitem from hashtables */
934         if (!hlist_unhashed(&exp->exp_uuid_hash))
935                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
936                              &exp->exp_client_uuid,
937                              &exp->exp_uuid_hash);
938
939         if (!hlist_unhashed(&exp->exp_gen_hash)) {
940                 struct tg_export_data   *ted = &exp->exp_target_data;
941                 struct cfs_hash         *hash;
942
943                 /* Because obd_gen_hash will not be released until
944                  * class_cleanup(), so hash should never be NULL here */
945                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
946                 LASSERT(hash != NULL);
947                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
948                              &exp->exp_gen_hash);
949                 cfs_hash_putref(hash);
950         }
951
952         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
953         list_del_init(&exp->exp_obd_chain_timed);
954         exp->exp_obd->obd_num_exports--;
955         spin_unlock(&exp->exp_obd->obd_dev_lock);
956         atomic_inc(&obd_stale_export_num);
957
958         /* A reference is kept by obd_stale_exports list */
959         obd_stale_export_put(exp);
960 }
961 EXPORT_SYMBOL(class_unlink_export);
962
963 /* Import management functions */
964 static void class_import_destroy(struct obd_import *imp)
965 {
966         ENTRY;
967
968         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
969                 imp->imp_obd->obd_name);
970
971         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
972
973         ptlrpc_put_connection_superhack(imp->imp_connection);
974
975         while (!list_empty(&imp->imp_conn_list)) {
976                 struct obd_import_conn *imp_conn;
977
978                 imp_conn = list_entry(imp->imp_conn_list.next,
979                                       struct obd_import_conn, oic_item);
980                 list_del_init(&imp_conn->oic_item);
981                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
982                 OBD_FREE(imp_conn, sizeof(*imp_conn));
983         }
984
985         LASSERT(imp->imp_sec == NULL);
986         class_decref(imp->imp_obd, "import", imp);
987         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
988         EXIT;
989 }
990
991 static void import_handle_addref(void *import)
992 {
993         class_import_get(import);
994 }
995
996 static struct portals_handle_ops import_handle_ops = {
997         .hop_addref = import_handle_addref,
998         .hop_free   = NULL,
999 };
1000
1001 struct obd_import *class_import_get(struct obd_import *import)
1002 {
1003         atomic_inc(&import->imp_refcount);
1004         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1005                atomic_read(&import->imp_refcount),
1006                import->imp_obd->obd_name);
1007         return import;
1008 }
1009 EXPORT_SYMBOL(class_import_get);
1010
1011 void class_import_put(struct obd_import *imp)
1012 {
1013         ENTRY;
1014
1015         LASSERT(list_empty(&imp->imp_zombie_chain));
1016         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1017
1018         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1019                atomic_read(&imp->imp_refcount) - 1,
1020                imp->imp_obd->obd_name);
1021
1022         if (atomic_dec_and_test(&imp->imp_refcount)) {
1023                 CDEBUG(D_INFO, "final put import %p\n", imp);
1024                 obd_zombie_import_add(imp);
1025         }
1026
1027         /* catch possible import put race */
1028         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1029         EXIT;
1030 }
1031 EXPORT_SYMBOL(class_import_put);
1032
1033 static void init_imp_at(struct imp_at *at) {
1034         int i;
1035         at_init(&at->iat_net_latency, 0, 0);
1036         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1037                 /* max service estimates are tracked on the server side, so
1038                    don't use the AT history here, just use the last reported
1039                    val. (But keep hist for proc histogram, worst_ever) */
1040                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1041                         AT_FLG_NOHIST);
1042         }
1043 }
1044
1045 struct obd_import *class_new_import(struct obd_device *obd)
1046 {
1047         struct obd_import *imp;
1048
1049         OBD_ALLOC(imp, sizeof(*imp));
1050         if (imp == NULL)
1051                 return NULL;
1052
1053         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1054         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1055         INIT_LIST_HEAD(&imp->imp_replay_list);
1056         INIT_LIST_HEAD(&imp->imp_sending_list);
1057         INIT_LIST_HEAD(&imp->imp_delayed_list);
1058         INIT_LIST_HEAD(&imp->imp_committed_list);
1059         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1060         imp->imp_known_replied_xid = 0;
1061         imp->imp_replay_cursor = &imp->imp_committed_list;
1062         spin_lock_init(&imp->imp_lock);
1063         imp->imp_last_success_conn = 0;
1064         imp->imp_state = LUSTRE_IMP_NEW;
1065         imp->imp_obd = class_incref(obd, "import", imp);
1066         mutex_init(&imp->imp_sec_mutex);
1067         init_waitqueue_head(&imp->imp_recovery_waitq);
1068
1069         atomic_set(&imp->imp_refcount, 2);
1070         atomic_set(&imp->imp_unregistering, 0);
1071         atomic_set(&imp->imp_inflight, 0);
1072         atomic_set(&imp->imp_replay_inflight, 0);
1073         atomic_set(&imp->imp_inval_count, 0);
1074         INIT_LIST_HEAD(&imp->imp_conn_list);
1075         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1076         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1077         init_imp_at(&imp->imp_at);
1078
1079         /* the default magic is V2, will be used in connect RPC, and
1080          * then adjusted according to the flags in request/reply. */
1081         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1082
1083         return imp;
1084 }
1085 EXPORT_SYMBOL(class_new_import);
1086
1087 void class_destroy_import(struct obd_import *import)
1088 {
1089         LASSERT(import != NULL);
1090         LASSERT(import != LP_POISON);
1091
1092         class_handle_unhash(&import->imp_handle);
1093
1094         spin_lock(&import->imp_lock);
1095         import->imp_generation++;
1096         spin_unlock(&import->imp_lock);
1097         class_import_put(import);
1098 }
1099 EXPORT_SYMBOL(class_destroy_import);
1100
1101 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1102
1103 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1104 {
1105         spin_lock(&exp->exp_locks_list_guard);
1106
1107         LASSERT(lock->l_exp_refs_nr >= 0);
1108
1109         if (lock->l_exp_refs_target != NULL &&
1110             lock->l_exp_refs_target != exp) {
1111                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1112                               exp, lock, lock->l_exp_refs_target);
1113         }
1114         if ((lock->l_exp_refs_nr ++) == 0) {
1115                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1116                 lock->l_exp_refs_target = exp;
1117         }
1118         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1119                lock, exp, lock->l_exp_refs_nr);
1120         spin_unlock(&exp->exp_locks_list_guard);
1121 }
1122 EXPORT_SYMBOL(__class_export_add_lock_ref);
1123
1124 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1125 {
1126         spin_lock(&exp->exp_locks_list_guard);
1127         LASSERT(lock->l_exp_refs_nr > 0);
1128         if (lock->l_exp_refs_target != exp) {
1129                 LCONSOLE_WARN("lock %p, "
1130                               "mismatching export pointers: %p, %p\n",
1131                               lock, lock->l_exp_refs_target, exp);
1132         }
1133         if (-- lock->l_exp_refs_nr == 0) {
1134                 list_del_init(&lock->l_exp_refs_link);
1135                 lock->l_exp_refs_target = NULL;
1136         }
1137         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1138                lock, exp, lock->l_exp_refs_nr);
1139         spin_unlock(&exp->exp_locks_list_guard);
1140 }
1141 EXPORT_SYMBOL(__class_export_del_lock_ref);
1142 #endif
1143
1144 /* A connection defines an export context in which preallocation can
1145    be managed. This releases the export pointer reference, and returns
1146    the export handle, so the export refcount is 1 when this function
1147    returns. */
1148 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1149                   struct obd_uuid *cluuid)
1150 {
1151         struct obd_export *export;
1152         LASSERT(conn != NULL);
1153         LASSERT(obd != NULL);
1154         LASSERT(cluuid != NULL);
1155         ENTRY;
1156
1157         export = class_new_export(obd, cluuid);
1158         if (IS_ERR(export))
1159                 RETURN(PTR_ERR(export));
1160
1161         conn->cookie = export->exp_handle.h_cookie;
1162         class_export_put(export);
1163
1164         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1165                cluuid->uuid, conn->cookie);
1166         RETURN(0);
1167 }
1168 EXPORT_SYMBOL(class_connect);
1169
1170 /* if export is involved in recovery then clean up related things */
1171 static void class_export_recovery_cleanup(struct obd_export *exp)
1172 {
1173         struct obd_device *obd = exp->exp_obd;
1174
1175         spin_lock(&obd->obd_recovery_task_lock);
1176         if (obd->obd_recovering) {
1177                 if (exp->exp_in_recovery) {
1178                         spin_lock(&exp->exp_lock);
1179                         exp->exp_in_recovery = 0;
1180                         spin_unlock(&exp->exp_lock);
1181                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1182                         atomic_dec(&obd->obd_connected_clients);
1183                 }
1184
1185                 /* if called during recovery then should update
1186                  * obd_stale_clients counter,
1187                  * lightweight exports are not counted */
1188                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1189                         exp->exp_obd->obd_stale_clients++;
1190         }
1191         spin_unlock(&obd->obd_recovery_task_lock);
1192
1193         spin_lock(&exp->exp_lock);
1194         /** Cleanup req replay fields */
1195         if (exp->exp_req_replay_needed) {
1196                 exp->exp_req_replay_needed = 0;
1197
1198                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1199                 atomic_dec(&obd->obd_req_replay_clients);
1200         }
1201
1202         /** Cleanup lock replay data */
1203         if (exp->exp_lock_replay_needed) {
1204                 exp->exp_lock_replay_needed = 0;
1205
1206                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1207                 atomic_dec(&obd->obd_lock_replay_clients);
1208         }
1209         spin_unlock(&exp->exp_lock);
1210 }
1211
1212 /* This function removes 1-3 references from the export:
1213  * 1 - for export pointer passed
1214  * and if disconnect really need
1215  * 2 - removing from hash
1216  * 3 - in client_unlink_export
1217  * The export pointer passed to this function can destroyed */
1218 int class_disconnect(struct obd_export *export)
1219 {
1220         int already_disconnected;
1221         ENTRY;
1222
1223         if (export == NULL) {
1224                 CWARN("attempting to free NULL export %p\n", export);
1225                 RETURN(-EINVAL);
1226         }
1227
1228         spin_lock(&export->exp_lock);
1229         already_disconnected = export->exp_disconnected;
1230         export->exp_disconnected = 1;
1231         spin_unlock(&export->exp_lock);
1232
1233         /* class_cleanup(), abort_recovery(), and class_fail_export()
1234          * all end up in here, and if any of them race we shouldn't
1235          * call extra class_export_puts(). */
1236         if (already_disconnected) {
1237                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1238                 GOTO(no_disconn, already_disconnected);
1239         }
1240
1241         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1242                export->exp_handle.h_cookie);
1243
1244         if (!hlist_unhashed(&export->exp_nid_hash))
1245                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1246                              &export->exp_connection->c_peer.nid,
1247                              &export->exp_nid_hash);
1248
1249         class_export_recovery_cleanup(export);
1250         class_unlink_export(export);
1251 no_disconn:
1252         class_export_put(export);
1253         RETURN(0);
1254 }
1255 EXPORT_SYMBOL(class_disconnect);
1256
1257 /* Return non-zero for a fully connected export */
1258 int class_connected_export(struct obd_export *exp)
1259 {
1260         int connected = 0;
1261
1262         if (exp) {
1263                 spin_lock(&exp->exp_lock);
1264                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1265                 spin_unlock(&exp->exp_lock);
1266         }
1267         return connected;
1268 }
1269 EXPORT_SYMBOL(class_connected_export);
1270
1271 static void class_disconnect_export_list(struct list_head *list,
1272                                          enum obd_option flags)
1273 {
1274         int rc;
1275         struct obd_export *exp;
1276         ENTRY;
1277
1278         /* It's possible that an export may disconnect itself, but
1279          * nothing else will be added to this list. */
1280         while (!list_empty(list)) {
1281                 exp = list_entry(list->next, struct obd_export,
1282                                  exp_obd_chain);
1283                 /* need for safe call CDEBUG after obd_disconnect */
1284                 class_export_get(exp);
1285
1286                 spin_lock(&exp->exp_lock);
1287                 exp->exp_flags = flags;
1288                 spin_unlock(&exp->exp_lock);
1289
1290                 if (obd_uuid_equals(&exp->exp_client_uuid,
1291                                     &exp->exp_obd->obd_uuid)) {
1292                         CDEBUG(D_HA,
1293                                "exp %p export uuid == obd uuid, don't discon\n",
1294                                exp);
1295                         /* Need to delete this now so we don't end up pointing
1296                          * to work_list later when this export is cleaned up. */
1297                         list_del_init(&exp->exp_obd_chain);
1298                         class_export_put(exp);
1299                         continue;
1300                 }
1301
1302                 class_export_get(exp);
1303                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1304                        "last request at "CFS_TIME_T"\n",
1305                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1306                        exp, exp->exp_last_request_time);
1307                 /* release one export reference anyway */
1308                 rc = obd_disconnect(exp);
1309
1310                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1311                        obd_export_nid2str(exp), exp, rc);
1312                 class_export_put(exp);
1313         }
1314         EXIT;
1315 }
1316
1317 void class_disconnect_exports(struct obd_device *obd)
1318 {
1319         struct list_head work_list;
1320         ENTRY;
1321
1322         /* Move all of the exports from obd_exports to a work list, en masse. */
1323         INIT_LIST_HEAD(&work_list);
1324         spin_lock(&obd->obd_dev_lock);
1325         list_splice_init(&obd->obd_exports, &work_list);
1326         list_splice_init(&obd->obd_delayed_exports, &work_list);
1327         spin_unlock(&obd->obd_dev_lock);
1328
1329         if (!list_empty(&work_list)) {
1330                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1331                        "disconnecting them\n", obd->obd_minor, obd);
1332                 class_disconnect_export_list(&work_list,
1333                                              exp_flags_from_obd(obd));
1334         } else
1335                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1336                        obd->obd_minor, obd);
1337         EXIT;
1338 }
1339 EXPORT_SYMBOL(class_disconnect_exports);
1340
1341 /* Remove exports that have not completed recovery.
1342  */
1343 void class_disconnect_stale_exports(struct obd_device *obd,
1344                                     int (*test_export)(struct obd_export *))
1345 {
1346         struct list_head work_list;
1347         struct obd_export *exp, *n;
1348         int evicted = 0;
1349         ENTRY;
1350
1351         INIT_LIST_HEAD(&work_list);
1352         spin_lock(&obd->obd_dev_lock);
1353         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1354                                  exp_obd_chain) {
1355                 /* don't count self-export as client */
1356                 if (obd_uuid_equals(&exp->exp_client_uuid,
1357                                     &exp->exp_obd->obd_uuid))
1358                         continue;
1359
1360                 /* don't evict clients which have no slot in last_rcvd
1361                  * (e.g. lightweight connection) */
1362                 if (exp->exp_target_data.ted_lr_idx == -1)
1363                         continue;
1364
1365                 spin_lock(&exp->exp_lock);
1366                 if (exp->exp_failed || test_export(exp)) {
1367                         spin_unlock(&exp->exp_lock);
1368                         continue;
1369                 }
1370                 exp->exp_failed = 1;
1371                 spin_unlock(&exp->exp_lock);
1372
1373                 list_move(&exp->exp_obd_chain, &work_list);
1374                 evicted++;
1375                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1376                        obd->obd_name, exp->exp_client_uuid.uuid,
1377                        exp->exp_connection == NULL ? "<unknown>" :
1378                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1379                 print_export_data(exp, "EVICTING", 0, D_HA);
1380         }
1381         spin_unlock(&obd->obd_dev_lock);
1382
1383         if (evicted)
1384                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1385                               obd->obd_name, evicted);
1386
1387         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1388                                                  OBD_OPT_ABORT_RECOV);
1389         EXIT;
1390 }
1391 EXPORT_SYMBOL(class_disconnect_stale_exports);
1392
1393 void class_fail_export(struct obd_export *exp)
1394 {
1395         int rc, already_failed;
1396
1397         spin_lock(&exp->exp_lock);
1398         already_failed = exp->exp_failed;
1399         exp->exp_failed = 1;
1400         spin_unlock(&exp->exp_lock);
1401
1402         if (already_failed) {
1403                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1404                        exp, exp->exp_client_uuid.uuid);
1405                 return;
1406         }
1407
1408         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1409                exp, exp->exp_client_uuid.uuid);
1410
1411         if (obd_dump_on_timeout)
1412                 libcfs_debug_dumplog();
1413
1414         /* need for safe call CDEBUG after obd_disconnect */
1415         class_export_get(exp);
1416
1417         /* Most callers into obd_disconnect are removing their own reference
1418          * (request, for example) in addition to the one from the hash table.
1419          * We don't have such a reference here, so make one. */
1420         class_export_get(exp);
1421         rc = obd_disconnect(exp);
1422         if (rc)
1423                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1424         else
1425                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1426                        exp, exp->exp_client_uuid.uuid);
1427         class_export_put(exp);
1428 }
1429 EXPORT_SYMBOL(class_fail_export);
1430
1431 char *obd_export_nid2str(struct obd_export *exp)
1432 {
1433         if (exp->exp_connection != NULL)
1434                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1435
1436         return "(no nid)";
1437 }
1438 EXPORT_SYMBOL(obd_export_nid2str);
1439
1440 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1441 {
1442         struct cfs_hash *nid_hash;
1443         struct obd_export *doomed_exp = NULL;
1444         int exports_evicted = 0;
1445
1446         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1447
1448         spin_lock(&obd->obd_dev_lock);
1449         /* umount has run already, so evict thread should leave
1450          * its task to umount thread now */
1451         if (obd->obd_stopping) {
1452                 spin_unlock(&obd->obd_dev_lock);
1453                 return exports_evicted;
1454         }
1455         nid_hash = obd->obd_nid_hash;
1456         cfs_hash_getref(nid_hash);
1457         spin_unlock(&obd->obd_dev_lock);
1458
1459         do {
1460                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1461                 if (doomed_exp == NULL)
1462                         break;
1463
1464                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1465                          "nid %s found, wanted nid %s, requested nid %s\n",
1466                          obd_export_nid2str(doomed_exp),
1467                          libcfs_nid2str(nid_key), nid);
1468                 LASSERTF(doomed_exp != obd->obd_self_export,
1469                          "self-export is hashed by NID?\n");
1470                 exports_evicted++;
1471                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1472                               "request\n", obd->obd_name,
1473                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1474                               obd_export_nid2str(doomed_exp));
1475                 class_fail_export(doomed_exp);
1476                 class_export_put(doomed_exp);
1477         } while (1);
1478
1479         cfs_hash_putref(nid_hash);
1480
1481         if (!exports_evicted)
1482                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1483                        obd->obd_name, nid);
1484         return exports_evicted;
1485 }
1486 EXPORT_SYMBOL(obd_export_evict_by_nid);
1487
1488 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1489 {
1490         struct cfs_hash *uuid_hash;
1491         struct obd_export *doomed_exp = NULL;
1492         struct obd_uuid doomed_uuid;
1493         int exports_evicted = 0;
1494
1495         spin_lock(&obd->obd_dev_lock);
1496         if (obd->obd_stopping) {
1497                 spin_unlock(&obd->obd_dev_lock);
1498                 return exports_evicted;
1499         }
1500         uuid_hash = obd->obd_uuid_hash;
1501         cfs_hash_getref(uuid_hash);
1502         spin_unlock(&obd->obd_dev_lock);
1503
1504         obd_str2uuid(&doomed_uuid, uuid);
1505         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1506                 CERROR("%s: can't evict myself\n", obd->obd_name);
1507                 cfs_hash_putref(uuid_hash);
1508                 return exports_evicted;
1509         }
1510
1511         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1512
1513         if (doomed_exp == NULL) {
1514                 CERROR("%s: can't disconnect %s: no exports found\n",
1515                        obd->obd_name, uuid);
1516         } else {
1517                 CWARN("%s: evicting %s at adminstrative request\n",
1518                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1519                 class_fail_export(doomed_exp);
1520                 class_export_put(doomed_exp);
1521                 exports_evicted++;
1522         }
1523         cfs_hash_putref(uuid_hash);
1524
1525         return exports_evicted;
1526 }
1527
1528 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1529 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1530 EXPORT_SYMBOL(class_export_dump_hook);
1531 #endif
1532
1533 static void print_export_data(struct obd_export *exp, const char *status,
1534                               int locks, int debug_level)
1535 {
1536         struct ptlrpc_reply_state *rs;
1537         struct ptlrpc_reply_state *first_reply = NULL;
1538         int nreplies = 0;
1539
1540         spin_lock(&exp->exp_lock);
1541         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1542                             rs_exp_list) {
1543                 if (nreplies == 0)
1544                         first_reply = rs;
1545                 nreplies++;
1546         }
1547         spin_unlock(&exp->exp_lock);
1548
1549         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1550                "%p %s %llu stale:%d\n",
1551                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1552                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1553                atomic_read(&exp->exp_rpc_count),
1554                atomic_read(&exp->exp_cb_count),
1555                atomic_read(&exp->exp_locks_count),
1556                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1557                nreplies, first_reply, nreplies > 3 ? "..." : "",
1558                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1559 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1560         if (locks && class_export_dump_hook != NULL)
1561                 class_export_dump_hook(exp);
1562 #endif
1563 }
1564
1565 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1566 {
1567         struct obd_export *exp;
1568
1569         spin_lock(&obd->obd_dev_lock);
1570         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1571                 print_export_data(exp, "ACTIVE", locks, debug_level);
1572         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1573                 print_export_data(exp, "UNLINKED", locks, debug_level);
1574         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1575                 print_export_data(exp, "DELAYED", locks, debug_level);
1576         spin_unlock(&obd->obd_dev_lock);
1577         spin_lock(&obd_zombie_impexp_lock);
1578         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1579                 print_export_data(exp, "ZOMBIE", locks, debug_level);
1580         spin_unlock(&obd_zombie_impexp_lock);
1581 }
1582
1583 void obd_exports_barrier(struct obd_device *obd)
1584 {
1585         int waited = 2;
1586         LASSERT(list_empty(&obd->obd_exports));
1587         spin_lock(&obd->obd_dev_lock);
1588         while (!list_empty(&obd->obd_unlinked_exports)) {
1589                 spin_unlock(&obd->obd_dev_lock);
1590                 set_current_state(TASK_UNINTERRUPTIBLE);
1591                 schedule_timeout(cfs_time_seconds(waited));
1592                 if (waited > 5 && IS_PO2(waited)) {
1593                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1594                                       "more than %d seconds. "
1595                                       "The obd refcount = %d. Is it stuck?\n",
1596                                       obd->obd_name, waited,
1597                                       atomic_read(&obd->obd_refcount));
1598                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1599                 }
1600                 waited *= 2;
1601                 spin_lock(&obd->obd_dev_lock);
1602         }
1603         spin_unlock(&obd->obd_dev_lock);
1604 }
1605 EXPORT_SYMBOL(obd_exports_barrier);
1606
1607 /* Total amount of zombies to be destroyed */
1608 static int zombies_count = 0;
1609
1610 /**
1611  * kill zombie imports and exports
1612  */
1613 void obd_zombie_impexp_cull(void)
1614 {
1615         struct obd_import *import;
1616         struct obd_export *export;
1617         ENTRY;
1618
1619         do {
1620                 spin_lock(&obd_zombie_impexp_lock);
1621
1622                 import = NULL;
1623                 if (!list_empty(&obd_zombie_imports)) {
1624                         import = list_entry(obd_zombie_imports.next,
1625                                             struct obd_import,
1626                                             imp_zombie_chain);
1627                         list_del_init(&import->imp_zombie_chain);
1628                 }
1629
1630                 export = NULL;
1631                 if (!list_empty(&obd_zombie_exports)) {
1632                         export = list_entry(obd_zombie_exports.next,
1633                                             struct obd_export,
1634                                             exp_obd_chain);
1635                         list_del_init(&export->exp_obd_chain);
1636                 }
1637
1638                 spin_unlock(&obd_zombie_impexp_lock);
1639
1640                 if (import != NULL) {
1641                         class_import_destroy(import);
1642                         spin_lock(&obd_zombie_impexp_lock);
1643                         zombies_count--;
1644                         spin_unlock(&obd_zombie_impexp_lock);
1645                 }
1646
1647                 if (export != NULL) {
1648                         class_export_destroy(export);
1649                         spin_lock(&obd_zombie_impexp_lock);
1650                         zombies_count--;
1651                         spin_unlock(&obd_zombie_impexp_lock);
1652                 }
1653
1654                 cond_resched();
1655         } while (import != NULL || export != NULL);
1656         EXIT;
1657 }
1658
1659 static struct completion        obd_zombie_start;
1660 static struct completion        obd_zombie_stop;
1661 static unsigned long            obd_zombie_flags;
1662 static wait_queue_head_t        obd_zombie_waitq;
1663 static pid_t                    obd_zombie_pid;
1664
1665 enum {
1666         OBD_ZOMBIE_STOP         = 0x0001,
1667 };
1668
1669 /**
1670  * check for work for kill zombie import/export thread.
1671  */
1672 static int obd_zombie_impexp_check(void *arg)
1673 {
1674         int rc;
1675
1676         spin_lock(&obd_zombie_impexp_lock);
1677         rc = (zombies_count == 0) &&
1678              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1679         spin_unlock(&obd_zombie_impexp_lock);
1680
1681         RETURN(rc);
1682 }
1683
1684 /**
1685  * Add export to the obd_zombe thread and notify it.
1686  */
1687 static void obd_zombie_export_add(struct obd_export *exp) {
1688         atomic_dec(&obd_stale_export_num);
1689         spin_lock(&exp->exp_obd->obd_dev_lock);
1690         LASSERT(!list_empty(&exp->exp_obd_chain));
1691         list_del_init(&exp->exp_obd_chain);
1692         spin_unlock(&exp->exp_obd->obd_dev_lock);
1693         spin_lock(&obd_zombie_impexp_lock);
1694         zombies_count++;
1695         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1696         spin_unlock(&obd_zombie_impexp_lock);
1697
1698         obd_zombie_impexp_notify();
1699 }
1700
1701 /**
1702  * Add import to the obd_zombe thread and notify it.
1703  */
1704 static void obd_zombie_import_add(struct obd_import *imp) {
1705         LASSERT(imp->imp_sec == NULL);
1706         spin_lock(&obd_zombie_impexp_lock);
1707         LASSERT(list_empty(&imp->imp_zombie_chain));
1708         zombies_count++;
1709         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1710         spin_unlock(&obd_zombie_impexp_lock);
1711
1712         obd_zombie_impexp_notify();
1713 }
1714
1715 /**
1716  * notify import/export destroy thread about new zombie.
1717  */
1718 static void obd_zombie_impexp_notify(void)
1719 {
1720         /*
1721          * Make sure obd_zomebie_impexp_thread get this notification.
1722          * It is possible this signal only get by obd_zombie_barrier, and
1723          * barrier gulps this notification and sleeps away and hangs ensues
1724          */
1725         wake_up_all(&obd_zombie_waitq);
1726 }
1727
1728 /**
1729  * check whether obd_zombie is idle
1730  */
1731 static int obd_zombie_is_idle(void)
1732 {
1733         int rc;
1734
1735         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1736         spin_lock(&obd_zombie_impexp_lock);
1737         rc = (zombies_count == 0);
1738         spin_unlock(&obd_zombie_impexp_lock);
1739         return rc;
1740 }
1741
1742 /**
1743  * wait when obd_zombie import/export queues become empty
1744  */
1745 void obd_zombie_barrier(void)
1746 {
1747         struct l_wait_info lwi = { 0 };
1748
1749         if (obd_zombie_pid == current_pid())
1750                 /* don't wait for myself */
1751                 return;
1752         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1753 }
1754 EXPORT_SYMBOL(obd_zombie_barrier);
1755
1756
1757 struct obd_export *obd_stale_export_get(void)
1758 {
1759         struct obd_export *exp = NULL;
1760         ENTRY;
1761
1762         spin_lock(&obd_stale_export_lock);
1763         if (!list_empty(&obd_stale_exports)) {
1764                 exp = list_entry(obd_stale_exports.next,
1765                                  struct obd_export, exp_stale_list);
1766                 list_del_init(&exp->exp_stale_list);
1767         }
1768         spin_unlock(&obd_stale_export_lock);
1769
1770         if (exp) {
1771                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1772                        atomic_read(&obd_stale_export_num));
1773         }
1774         RETURN(exp);
1775 }
1776 EXPORT_SYMBOL(obd_stale_export_get);
1777
1778 void obd_stale_export_put(struct obd_export *exp)
1779 {
1780         ENTRY;
1781
1782         LASSERT(list_empty(&exp->exp_stale_list));
1783         if (exp->exp_lock_hash &&
1784             atomic_read(&exp->exp_lock_hash->hs_count)) {
1785                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1786                        atomic_read(&obd_stale_export_num));
1787
1788                 spin_lock_bh(&exp->exp_bl_list_lock);
1789                 spin_lock(&obd_stale_export_lock);
1790                 /* Add to the tail if there is no blocked locks,
1791                  * to the head otherwise. */
1792                 if (list_empty(&exp->exp_bl_list))
1793                         list_add_tail(&exp->exp_stale_list,
1794                                       &obd_stale_exports);
1795                 else
1796                         list_add(&exp->exp_stale_list,
1797                                  &obd_stale_exports);
1798
1799                 spin_unlock(&obd_stale_export_lock);
1800                 spin_unlock_bh(&exp->exp_bl_list_lock);
1801         } else {
1802                 class_export_put(exp);
1803         }
1804         EXIT;
1805 }
1806 EXPORT_SYMBOL(obd_stale_export_put);
1807
1808 /**
1809  * Adjust the position of the export in the stale list,
1810  * i.e. move to the head of the list if is needed.
1811  **/
1812 void obd_stale_export_adjust(struct obd_export *exp)
1813 {
1814         LASSERT(exp != NULL);
1815         spin_lock_bh(&exp->exp_bl_list_lock);
1816         spin_lock(&obd_stale_export_lock);
1817
1818         if (!list_empty(&exp->exp_stale_list) &&
1819             !list_empty(&exp->exp_bl_list))
1820                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1821
1822         spin_unlock(&obd_stale_export_lock);
1823         spin_unlock_bh(&exp->exp_bl_list_lock);
1824 }
1825 EXPORT_SYMBOL(obd_stale_export_adjust);
1826
1827 /**
1828  * destroy zombie export/import thread.
1829  */
1830 static int obd_zombie_impexp_thread(void *unused)
1831 {
1832         unshare_fs_struct();
1833         complete(&obd_zombie_start);
1834
1835         obd_zombie_pid = current_pid();
1836
1837         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1838                 struct l_wait_info lwi = { 0 };
1839
1840                 l_wait_event(obd_zombie_waitq,
1841                              !obd_zombie_impexp_check(NULL), &lwi);
1842                 obd_zombie_impexp_cull();
1843
1844                 /*
1845                  * Notify obd_zombie_barrier callers that queues
1846                  * may be empty.
1847                  */
1848                 wake_up(&obd_zombie_waitq);
1849         }
1850
1851         complete(&obd_zombie_stop);
1852
1853         RETURN(0);
1854 }
1855
1856
1857 /**
1858  * start destroy zombie import/export thread
1859  */
1860 int obd_zombie_impexp_init(void)
1861 {
1862         struct task_struct *task;
1863
1864         INIT_LIST_HEAD(&obd_zombie_imports);
1865
1866         INIT_LIST_HEAD(&obd_zombie_exports);
1867         spin_lock_init(&obd_zombie_impexp_lock);
1868         init_completion(&obd_zombie_start);
1869         init_completion(&obd_zombie_stop);
1870         init_waitqueue_head(&obd_zombie_waitq);
1871         obd_zombie_pid = 0;
1872
1873         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1874         if (IS_ERR(task))
1875                 RETURN(PTR_ERR(task));
1876
1877         wait_for_completion(&obd_zombie_start);
1878         RETURN(0);
1879 }
1880 /**
1881  * stop destroy zombie import/export thread
1882  */
1883 void obd_zombie_impexp_stop(void)
1884 {
1885         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1886         obd_zombie_impexp_notify();
1887         wait_for_completion(&obd_zombie_stop);
1888 }
1889
1890 /***** Kernel-userspace comm helpers *******/
1891
1892 /* Get length of entire message, including header */
1893 int kuc_len(int payload_len)
1894 {
1895         return sizeof(struct kuc_hdr) + payload_len;
1896 }
1897 EXPORT_SYMBOL(kuc_len);
1898
1899 /* Get a pointer to kuc header, given a ptr to the payload
1900  * @param p Pointer to payload area
1901  * @returns Pointer to kuc header
1902  */
1903 struct kuc_hdr * kuc_ptr(void *p)
1904 {
1905         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1906         LASSERT(lh->kuc_magic == KUC_MAGIC);
1907         return lh;
1908 }
1909 EXPORT_SYMBOL(kuc_ptr);
1910
1911 /* Alloc space for a message, and fill in header
1912  * @return Pointer to payload area
1913  */
1914 void *kuc_alloc(int payload_len, int transport, int type)
1915 {
1916         struct kuc_hdr *lh;
1917         int len = kuc_len(payload_len);
1918
1919         OBD_ALLOC(lh, len);
1920         if (lh == NULL)
1921                 return ERR_PTR(-ENOMEM);
1922
1923         lh->kuc_magic = KUC_MAGIC;
1924         lh->kuc_transport = transport;
1925         lh->kuc_msgtype = type;
1926         lh->kuc_msglen = len;
1927
1928         return (void *)(lh + 1);
1929 }
1930 EXPORT_SYMBOL(kuc_alloc);
1931
1932 /* Takes pointer to payload area */
1933 inline void kuc_free(void *p, int payload_len)
1934 {
1935         struct kuc_hdr *lh = kuc_ptr(p);
1936         OBD_FREE(lh, kuc_len(payload_len));
1937 }
1938 EXPORT_SYMBOL(kuc_free);
1939
1940 struct obd_request_slot_waiter {
1941         struct list_head        orsw_entry;
1942         wait_queue_head_t       orsw_waitq;
1943         bool                    orsw_signaled;
1944 };
1945
1946 static bool obd_request_slot_avail(struct client_obd *cli,
1947                                    struct obd_request_slot_waiter *orsw)
1948 {
1949         bool avail;
1950
1951         spin_lock(&cli->cl_loi_list_lock);
1952         avail = !!list_empty(&orsw->orsw_entry);
1953         spin_unlock(&cli->cl_loi_list_lock);
1954
1955         return avail;
1956 };
1957
1958 /*
1959  * For network flow control, the RPC sponsor needs to acquire a credit
1960  * before sending the RPC. The credits count for a connection is defined
1961  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1962  * the subsequent RPC sponsors need to wait until others released their
1963  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1964  */
1965 int obd_get_request_slot(struct client_obd *cli)
1966 {
1967         struct obd_request_slot_waiter   orsw;
1968         struct l_wait_info               lwi;
1969         int                              rc;
1970
1971         spin_lock(&cli->cl_loi_list_lock);
1972         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1973                 cli->cl_r_in_flight++;
1974                 spin_unlock(&cli->cl_loi_list_lock);
1975                 return 0;
1976         }
1977
1978         init_waitqueue_head(&orsw.orsw_waitq);
1979         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1980         orsw.orsw_signaled = false;
1981         spin_unlock(&cli->cl_loi_list_lock);
1982
1983         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1984         rc = l_wait_event(orsw.orsw_waitq,
1985                           obd_request_slot_avail(cli, &orsw) ||
1986                           orsw.orsw_signaled,
1987                           &lwi);
1988
1989         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1990          * freed but other (such as obd_put_request_slot) is using it. */
1991         spin_lock(&cli->cl_loi_list_lock);
1992         if (rc != 0) {
1993                 if (!orsw.orsw_signaled) {
1994                         if (list_empty(&orsw.orsw_entry))
1995                                 cli->cl_r_in_flight--;
1996                         else
1997                                 list_del(&orsw.orsw_entry);
1998                 }
1999         }
2000
2001         if (orsw.orsw_signaled) {
2002                 LASSERT(list_empty(&orsw.orsw_entry));
2003
2004                 rc = -EINTR;
2005         }
2006         spin_unlock(&cli->cl_loi_list_lock);
2007
2008         return rc;
2009 }
2010 EXPORT_SYMBOL(obd_get_request_slot);
2011
2012 void obd_put_request_slot(struct client_obd *cli)
2013 {
2014         struct obd_request_slot_waiter *orsw;
2015
2016         spin_lock(&cli->cl_loi_list_lock);
2017         cli->cl_r_in_flight--;
2018
2019         /* If there is free slot, wakeup the first waiter. */
2020         if (!list_empty(&cli->cl_loi_read_list) &&
2021             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2022                 orsw = list_entry(cli->cl_loi_read_list.next,
2023                                   struct obd_request_slot_waiter, orsw_entry);
2024                 list_del_init(&orsw->orsw_entry);
2025                 cli->cl_r_in_flight++;
2026                 wake_up(&orsw->orsw_waitq);
2027         }
2028         spin_unlock(&cli->cl_loi_list_lock);
2029 }
2030 EXPORT_SYMBOL(obd_put_request_slot);
2031
2032 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2033 {
2034         return cli->cl_max_rpcs_in_flight;
2035 }
2036 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2037
2038 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2039 {
2040         struct obd_request_slot_waiter *orsw;
2041         __u32                           old;
2042         int                             diff;
2043         int                             i;
2044         char                            *typ_name;
2045         int                             rc;
2046
2047         if (max > OBD_MAX_RIF_MAX || max < 1)
2048                 return -ERANGE;
2049
2050         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2051         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2052                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2053                  * strictly lower that max_rpcs_in_flight */
2054                 if (max < 2) {
2055                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2056                                "because it must be higher than "
2057                                "max_mod_rpcs_in_flight value",
2058                                cli->cl_import->imp_obd->obd_name);
2059                         return -ERANGE;
2060                 }
2061                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2062                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2063                         if (rc != 0)
2064                                 return rc;
2065                 }
2066         }
2067
2068         spin_lock(&cli->cl_loi_list_lock);
2069         old = cli->cl_max_rpcs_in_flight;
2070         cli->cl_max_rpcs_in_flight = max;
2071         diff = max - old;
2072
2073         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2074         for (i = 0; i < diff; i++) {
2075                 if (list_empty(&cli->cl_loi_read_list))
2076                         break;
2077
2078                 orsw = list_entry(cli->cl_loi_read_list.next,
2079                                   struct obd_request_slot_waiter, orsw_entry);
2080                 list_del_init(&orsw->orsw_entry);
2081                 cli->cl_r_in_flight++;
2082                 wake_up(&orsw->orsw_waitq);
2083         }
2084         spin_unlock(&cli->cl_loi_list_lock);
2085
2086         return 0;
2087 }
2088 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2089
2090 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2091 {
2092         return cli->cl_max_mod_rpcs_in_flight;
2093 }
2094 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2095
2096 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2097 {
2098         struct obd_connect_data *ocd;
2099         __u16 maxmodrpcs;
2100         __u16 prev;
2101
2102         if (max > OBD_MAX_RIF_MAX || max < 1)
2103                 return -ERANGE;
2104
2105         /* cannot exceed or equal max_rpcs_in_flight */
2106         if (max >= cli->cl_max_rpcs_in_flight) {
2107                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2108                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2109                        cli->cl_import->imp_obd->obd_name,
2110                        max, cli->cl_max_rpcs_in_flight);
2111                 return -ERANGE;
2112         }
2113
2114         /* cannot exceed max modify RPCs in flight supported by the server */
2115         ocd = &cli->cl_import->imp_connect_data;
2116         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2117                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2118         else
2119                 maxmodrpcs = 1;
2120         if (max > maxmodrpcs) {
2121                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2122                        "higher than max_mod_rpcs_per_client value (%hu) "
2123                        "returned by the server at connection\n",
2124                        cli->cl_import->imp_obd->obd_name,
2125                        max, maxmodrpcs);
2126                 return -ERANGE;
2127         }
2128
2129         spin_lock(&cli->cl_mod_rpcs_lock);
2130
2131         prev = cli->cl_max_mod_rpcs_in_flight;
2132         cli->cl_max_mod_rpcs_in_flight = max;
2133
2134         /* wakeup waiters if limit has been increased */
2135         if (cli->cl_max_mod_rpcs_in_flight > prev)
2136                 wake_up(&cli->cl_mod_rpcs_waitq);
2137
2138         spin_unlock(&cli->cl_mod_rpcs_lock);
2139
2140         return 0;
2141 }
2142 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2143
2144
2145 #define pct(a, b) (b ? a * 100 / b : 0)
2146 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2147                                struct seq_file *seq)
2148 {
2149         struct timeval now;
2150         unsigned long mod_tot = 0, mod_cum;
2151         int i;
2152
2153         do_gettimeofday(&now);
2154
2155         spin_lock(&cli->cl_mod_rpcs_lock);
2156
2157         seq_printf(seq, "snapshot_time:         %lu.%lu (secs.usecs)\n",
2158                    now.tv_sec, now.tv_usec);
2159         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2160                    cli->cl_mod_rpcs_in_flight);
2161
2162         seq_printf(seq, "\n\t\t\tmodify\n");
2163         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2164
2165         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2166
2167         mod_cum = 0;
2168         for (i = 0; i < OBD_HIST_MAX; i++) {
2169                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2170                 mod_cum += mod;
2171                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2172                            i, mod, pct(mod, mod_tot),
2173                            pct(mod_cum, mod_tot));
2174                 if (mod_cum == mod_tot)
2175                         break;
2176         }
2177
2178         spin_unlock(&cli->cl_mod_rpcs_lock);
2179
2180         return 0;
2181 }
2182 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2183 #undef pct
2184
2185
2186 /* The number of modify RPCs sent in parallel is limited
2187  * because the server has a finite number of slots per client to
2188  * store request result and ensure reply reconstruction when needed.
2189  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2190  * that takes into account server limit and cl_max_rpcs_in_flight
2191  * value.
2192  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2193  * one close request is allowed above the maximum.
2194  */
2195 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2196                                                  bool close_req)
2197 {
2198         bool avail;
2199
2200         /* A slot is available if
2201          * - number of modify RPCs in flight is less than the max
2202          * - it's a close RPC and no other close request is in flight
2203          */
2204         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2205                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2206
2207         return avail;
2208 }
2209
2210 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2211                                          bool close_req)
2212 {
2213         bool avail;
2214
2215         spin_lock(&cli->cl_mod_rpcs_lock);
2216         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2217         spin_unlock(&cli->cl_mod_rpcs_lock);
2218         return avail;
2219 }
2220
2221 /* Get a modify RPC slot from the obd client @cli according
2222  * to the kind of operation @opc that is going to be sent
2223  * and the intent @it of the operation if it applies.
2224  * If the maximum number of modify RPCs in flight is reached
2225  * the thread is put to sleep.
2226  * Returns the tag to be set in the request message. Tag 0
2227  * is reserved for non-modifying requests.
2228  */
2229 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2230                            struct lookup_intent *it)
2231 {
2232         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2233         bool                    close_req = false;
2234         __u16                   i, max;
2235
2236         /* read-only metadata RPCs don't consume a slot on MDT
2237          * for reply reconstruction
2238          */
2239         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2240                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2241                 return 0;
2242
2243         if (opc == MDS_CLOSE)
2244                 close_req = true;
2245
2246         do {
2247                 spin_lock(&cli->cl_mod_rpcs_lock);
2248                 max = cli->cl_max_mod_rpcs_in_flight;
2249                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2250                         /* there is a slot available */
2251                         cli->cl_mod_rpcs_in_flight++;
2252                         if (close_req)
2253                                 cli->cl_close_rpcs_in_flight++;
2254                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2255                                          cli->cl_mod_rpcs_in_flight);
2256                         /* find a free tag */
2257                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2258                                                 max + 1);
2259                         LASSERT(i < OBD_MAX_RIF_MAX);
2260                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2261                         spin_unlock(&cli->cl_mod_rpcs_lock);
2262                         /* tag 0 is reserved for non-modify RPCs */
2263                         return i + 1;
2264                 }
2265                 spin_unlock(&cli->cl_mod_rpcs_lock);
2266
2267                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2268                        "opc %u, max %hu\n",
2269                        cli->cl_import->imp_obd->obd_name, opc, max);
2270
2271                 l_wait_event(cli->cl_mod_rpcs_waitq,
2272                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2273         } while (true);
2274 }
2275 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2276
2277 /* Put a modify RPC slot from the obd client @cli according
2278  * to the kind of operation @opc that has been sent and the
2279  * intent @it of the operation if it applies.
2280  */
2281 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2282                           struct lookup_intent *it, __u16 tag)
2283 {
2284         bool                    close_req = false;
2285
2286         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2287                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2288                 return;
2289
2290         if (opc == MDS_CLOSE)
2291                 close_req = true;
2292
2293         spin_lock(&cli->cl_mod_rpcs_lock);
2294         cli->cl_mod_rpcs_in_flight--;
2295         if (close_req)
2296                 cli->cl_close_rpcs_in_flight--;
2297         /* release the tag in the bitmap */
2298         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2299         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2300         spin_unlock(&cli->cl_mod_rpcs_lock);
2301         wake_up(&cli->cl_mod_rpcs_waitq);
2302 }
2303 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2304