Whamcloud - gitweb
LU-7030 security: put imp_sec after all requests drained off
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_disk.h>
48 #include <lustre_kernelcomm.h>
49
50 spinlock_t obd_types_lock;
51
52 static struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 static struct kmem_cache *import_cachep;
56
57 static struct list_head obd_zombie_imports;
58 static struct list_head obd_zombie_exports;
59 static spinlock_t  obd_zombie_impexp_lock;
60
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 struct list_head obd_stale_exports;
68 spinlock_t       obd_stale_export_lock;
69 atomic_t         obd_stale_export_num;
70
71 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
72 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
73 void (*sptlrpc_sec_put_superhack)(struct obd_import *imp);
74 EXPORT_SYMBOL(sptlrpc_sec_put_superhack);
75
76 /*
77  * support functions: we could use inter-module communication, but this
78  * is more portable to other OS's
79  */
80 static struct obd_device *obd_device_alloc(void)
81 {
82         struct obd_device *obd;
83
84         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
85         if (obd != NULL) {
86                 obd->obd_magic = OBD_DEVICE_MAGIC;
87         }
88         return obd;
89 }
90
91 static void obd_device_free(struct obd_device *obd)
92 {
93         LASSERT(obd != NULL);
94         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
95                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
96         if (obd->obd_namespace != NULL) {
97                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
98                        obd, obd->obd_namespace, obd->obd_force);
99                 LBUG();
100         }
101         lu_ref_fini(&obd->obd_reference);
102         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
103 }
104
105 struct obd_type *class_search_type(const char *name)
106 {
107         struct list_head *tmp;
108         struct obd_type *type;
109
110         spin_lock(&obd_types_lock);
111         list_for_each(tmp, &obd_types) {
112                 type = list_entry(tmp, struct obd_type, typ_chain);
113                 if (strcmp(type->typ_name, name) == 0) {
114                         spin_unlock(&obd_types_lock);
115                         return type;
116                 }
117         }
118         spin_unlock(&obd_types_lock);
119         return NULL;
120 }
121 EXPORT_SYMBOL(class_search_type);
122
123 struct obd_type *class_get_type(const char *name)
124 {
125         struct obd_type *type = class_search_type(name);
126
127 #ifdef HAVE_MODULE_LOADING_SUPPORT
128         if (!type) {
129                 const char *modname = name;
130
131                 if (strcmp(modname, "obdfilter") == 0)
132                         modname = "ofd";
133
134                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
135                         modname = LUSTRE_OSP_NAME;
136
137                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
138                         modname = LUSTRE_MDT_NAME;
139
140                 if (!request_module("%s", modname)) {
141                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
142                         type = class_search_type(name);
143                 } else {
144                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
145                                            modname);
146                 }
147         }
148 #endif
149         if (type) {
150                 spin_lock(&type->obd_type_lock);
151                 type->typ_refcnt++;
152                 try_module_get(type->typ_dt_ops->o_owner);
153                 spin_unlock(&type->obd_type_lock);
154         }
155         return type;
156 }
157
158 void class_put_type(struct obd_type *type)
159 {
160         LASSERT(type);
161         spin_lock(&type->obd_type_lock);
162         type->typ_refcnt--;
163         module_put(type->typ_dt_ops->o_owner);
164         spin_unlock(&type->obd_type_lock);
165 }
166
167 #define CLASS_MAX_NAME 1024
168
169 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
170                         bool enable_proc, struct lprocfs_vars *vars,
171                         const char *name, struct lu_device_type *ldt)
172 {
173         struct obd_type *type;
174         int rc = 0;
175         ENTRY;
176
177         /* sanity check */
178         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
179
180         if (class_search_type(name)) {
181                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
182                 RETURN(-EEXIST);
183         }
184
185         rc = -ENOMEM;
186         OBD_ALLOC(type, sizeof(*type));
187         if (type == NULL)
188                 RETURN(rc);
189
190         OBD_ALLOC_PTR(type->typ_dt_ops);
191         OBD_ALLOC_PTR(type->typ_md_ops);
192         OBD_ALLOC(type->typ_name, strlen(name) + 1);
193
194         if (type->typ_dt_ops == NULL ||
195             type->typ_md_ops == NULL ||
196             type->typ_name == NULL)
197                 GOTO (failed, rc);
198
199         *(type->typ_dt_ops) = *dt_ops;
200         /* md_ops is optional */
201         if (md_ops)
202                 *(type->typ_md_ops) = *md_ops;
203         strcpy(type->typ_name, name);
204         spin_lock_init(&type->obd_type_lock);
205
206 #ifdef CONFIG_PROC_FS
207         if (enable_proc) {
208                 type->typ_procroot = lprocfs_register(type->typ_name,
209                                                       proc_lustre_root,
210                                                       vars, type);
211                 if (IS_ERR(type->typ_procroot)) {
212                         rc = PTR_ERR(type->typ_procroot);
213                         type->typ_procroot = NULL;
214                         GOTO(failed, rc);
215                 }
216         }
217 #endif
218         if (ldt != NULL) {
219                 type->typ_lu = ldt;
220                 rc = lu_device_type_init(ldt);
221                 if (rc != 0)
222                         GOTO (failed, rc);
223         }
224
225         spin_lock(&obd_types_lock);
226         list_add(&type->typ_chain, &obd_types);
227         spin_unlock(&obd_types_lock);
228
229         RETURN (0);
230
231 failed:
232         if (type->typ_name != NULL) {
233 #ifdef CONFIG_PROC_FS
234                 if (type->typ_procroot != NULL)
235                         remove_proc_subtree(type->typ_name, proc_lustre_root);
236 #endif
237                 OBD_FREE(type->typ_name, strlen(name) + 1);
238         }
239         if (type->typ_md_ops != NULL)
240                 OBD_FREE_PTR(type->typ_md_ops);
241         if (type->typ_dt_ops != NULL)
242                 OBD_FREE_PTR(type->typ_dt_ops);
243         OBD_FREE(type, sizeof(*type));
244         RETURN(rc);
245 }
246 EXPORT_SYMBOL(class_register_type);
247
248 int class_unregister_type(const char *name)
249 {
250         struct obd_type *type = class_search_type(name);
251         ENTRY;
252
253         if (!type) {
254                 CERROR("unknown obd type\n");
255                 RETURN(-EINVAL);
256         }
257
258         if (type->typ_refcnt) {
259                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
260                 /* This is a bad situation, let's make the best of it */
261                 /* Remove ops, but leave the name for debugging */
262                 OBD_FREE_PTR(type->typ_dt_ops);
263                 OBD_FREE_PTR(type->typ_md_ops);
264                 RETURN(-EBUSY);
265         }
266
267         /* we do not use type->typ_procroot as for compatibility purposes
268          * other modules can share names (i.e. lod can use lov entry). so
269          * we can't reference pointer as it can get invalided when another
270          * module removes the entry */
271 #ifdef CONFIG_PROC_FS
272         if (type->typ_procroot != NULL)
273                 remove_proc_subtree(type->typ_name, proc_lustre_root);
274         if (type->typ_procsym != NULL)
275                 lprocfs_remove(&type->typ_procsym);
276 #endif
277         if (type->typ_lu)
278                 lu_device_type_fini(type->typ_lu);
279
280         spin_lock(&obd_types_lock);
281         list_del(&type->typ_chain);
282         spin_unlock(&obd_types_lock);
283         OBD_FREE(type->typ_name, strlen(name) + 1);
284         if (type->typ_dt_ops != NULL)
285                 OBD_FREE_PTR(type->typ_dt_ops);
286         if (type->typ_md_ops != NULL)
287                 OBD_FREE_PTR(type->typ_md_ops);
288         OBD_FREE(type, sizeof(*type));
289         RETURN(0);
290 } /* class_unregister_type */
291 EXPORT_SYMBOL(class_unregister_type);
292
293 /**
294  * Create a new obd device.
295  *
296  * Find an empty slot in ::obd_devs[], create a new obd device in it.
297  *
298  * \param[in] type_name obd device type string.
299  * \param[in] name      obd device name.
300  *
301  * \retval NULL if create fails, otherwise return the obd device
302  *         pointer created.
303  */
304 struct obd_device *class_newdev(const char *type_name, const char *name)
305 {
306         struct obd_device *result = NULL;
307         struct obd_device *newdev;
308         struct obd_type *type = NULL;
309         int i;
310         int new_obd_minor = 0;
311         ENTRY;
312
313         if (strlen(name) >= MAX_OBD_NAME) {
314                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
315                 RETURN(ERR_PTR(-EINVAL));
316         }
317
318         type = class_get_type(type_name);
319         if (type == NULL){
320                 CERROR("OBD: unknown type: %s\n", type_name);
321                 RETURN(ERR_PTR(-ENODEV));
322         }
323
324         newdev = obd_device_alloc();
325         if (newdev == NULL)
326                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
327
328         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
329
330         write_lock(&obd_dev_lock);
331         for (i = 0; i < class_devno_max(); i++) {
332                 struct obd_device *obd = class_num2obd(i);
333
334                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
335                         CERROR("Device %s already exists at %d, won't add\n",
336                                name, i);
337                         if (result) {
338                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
339                                          "%p obd_magic %08x != %08x\n", result,
340                                          result->obd_magic, OBD_DEVICE_MAGIC);
341                                 LASSERTF(result->obd_minor == new_obd_minor,
342                                          "%p obd_minor %d != %d\n", result,
343                                          result->obd_minor, new_obd_minor);
344
345                                 obd_devs[result->obd_minor] = NULL;
346                                 result->obd_name[0]='\0';
347                          }
348                         result = ERR_PTR(-EEXIST);
349                         break;
350                 }
351                 if (!result && !obd) {
352                         result = newdev;
353                         result->obd_minor = i;
354                         new_obd_minor = i;
355                         result->obd_type = type;
356                         strncpy(result->obd_name, name,
357                                 sizeof(result->obd_name) - 1);
358                         obd_devs[i] = result;
359                 }
360         }
361         write_unlock(&obd_dev_lock);
362
363         if (result == NULL && i >= class_devno_max()) {
364                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
365                        class_devno_max());
366                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
367         }
368
369         if (IS_ERR(result))
370                 GOTO(out, result);
371
372         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
373                result->obd_name, result);
374
375         RETURN(result);
376 out:
377         obd_device_free(newdev);
378 out_type:
379         class_put_type(type);
380         return result;
381 }
382
383 void class_release_dev(struct obd_device *obd)
384 {
385         struct obd_type *obd_type = obd->obd_type;
386
387         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
388                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
389         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
390                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
391         LASSERT(obd_type != NULL);
392
393         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
394                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
395
396         write_lock(&obd_dev_lock);
397         obd_devs[obd->obd_minor] = NULL;
398         write_unlock(&obd_dev_lock);
399         obd_device_free(obd);
400
401         class_put_type(obd_type);
402 }
403
404 int class_name2dev(const char *name)
405 {
406         int i;
407
408         if (!name)
409                 return -1;
410
411         read_lock(&obd_dev_lock);
412         for (i = 0; i < class_devno_max(); i++) {
413                 struct obd_device *obd = class_num2obd(i);
414
415                 if (obd && strcmp(name, obd->obd_name) == 0) {
416                         /* Make sure we finished attaching before we give
417                            out any references */
418                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
419                         if (obd->obd_attached) {
420                                 read_unlock(&obd_dev_lock);
421                                 return i;
422                         }
423                         break;
424                 }
425         }
426         read_unlock(&obd_dev_lock);
427
428         return -1;
429 }
430
431 struct obd_device *class_name2obd(const char *name)
432 {
433         int dev = class_name2dev(name);
434
435         if (dev < 0 || dev > class_devno_max())
436                 return NULL;
437         return class_num2obd(dev);
438 }
439 EXPORT_SYMBOL(class_name2obd);
440
441 int class_uuid2dev(struct obd_uuid *uuid)
442 {
443         int i;
444
445         read_lock(&obd_dev_lock);
446         for (i = 0; i < class_devno_max(); i++) {
447                 struct obd_device *obd = class_num2obd(i);
448
449                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
450                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
451                         read_unlock(&obd_dev_lock);
452                         return i;
453                 }
454         }
455         read_unlock(&obd_dev_lock);
456
457         return -1;
458 }
459
460 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
461 {
462         int dev = class_uuid2dev(uuid);
463         if (dev < 0)
464                 return NULL;
465         return class_num2obd(dev);
466 }
467 EXPORT_SYMBOL(class_uuid2obd);
468
469 /**
470  * Get obd device from ::obd_devs[]
471  *
472  * \param num [in] array index
473  *
474  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
475  *         otherwise return the obd device there.
476  */
477 struct obd_device *class_num2obd(int num)
478 {
479         struct obd_device *obd = NULL;
480
481         if (num < class_devno_max()) {
482                 obd = obd_devs[num];
483                 if (obd == NULL)
484                         return NULL;
485
486                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
487                          "%p obd_magic %08x != %08x\n",
488                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
489                 LASSERTF(obd->obd_minor == num,
490                          "%p obd_minor %0d != %0d\n",
491                          obd, obd->obd_minor, num);
492         }
493
494         return obd;
495 }
496
497 /**
498  * Get obd devices count. Device in any
499  *    state are counted
500  * \retval obd device count
501  */
502 int get_devices_count(void)
503 {
504         int index, max_index = class_devno_max(), dev_count = 0;
505
506         read_lock(&obd_dev_lock);
507         for (index = 0; index <= max_index; index++) {
508                 struct obd_device *obd = class_num2obd(index);
509                 if (obd != NULL)
510                         dev_count++;
511         }
512         read_unlock(&obd_dev_lock);
513
514         return dev_count;
515 }
516 EXPORT_SYMBOL(get_devices_count);
517
518 void class_obd_list(void)
519 {
520         char *status;
521         int i;
522
523         read_lock(&obd_dev_lock);
524         for (i = 0; i < class_devno_max(); i++) {
525                 struct obd_device *obd = class_num2obd(i);
526
527                 if (obd == NULL)
528                         continue;
529                 if (obd->obd_stopping)
530                         status = "ST";
531                 else if (obd->obd_set_up)
532                         status = "UP";
533                 else if (obd->obd_attached)
534                         status = "AT";
535                 else
536                         status = "--";
537                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
538                          i, status, obd->obd_type->typ_name,
539                          obd->obd_name, obd->obd_uuid.uuid,
540                          atomic_read(&obd->obd_refcount));
541         }
542         read_unlock(&obd_dev_lock);
543         return;
544 }
545
546 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
547    specified, then only the client with that uuid is returned,
548    otherwise any client connected to the tgt is returned. */
549 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
550                                           const char * typ_name,
551                                           struct obd_uuid *grp_uuid)
552 {
553         int i;
554
555         read_lock(&obd_dev_lock);
556         for (i = 0; i < class_devno_max(); i++) {
557                 struct obd_device *obd = class_num2obd(i);
558
559                 if (obd == NULL)
560                         continue;
561                 if ((strncmp(obd->obd_type->typ_name, typ_name,
562                              strlen(typ_name)) == 0)) {
563                         if (obd_uuid_equals(tgt_uuid,
564                                             &obd->u.cli.cl_target_uuid) &&
565                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
566                                                          &obd->obd_uuid) : 1)) {
567                                 read_unlock(&obd_dev_lock);
568                                 return obd;
569                         }
570                 }
571         }
572         read_unlock(&obd_dev_lock);
573
574         return NULL;
575 }
576 EXPORT_SYMBOL(class_find_client_obd);
577
578 /* Iterate the obd_device list looking devices have grp_uuid. Start
579    searching at *next, and if a device is found, the next index to look
580    at is saved in *next. If next is NULL, then the first matching device
581    will always be returned. */
582 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
583 {
584         int i;
585
586         if (next == NULL)
587                 i = 0;
588         else if (*next >= 0 && *next < class_devno_max())
589                 i = *next;
590         else
591                 return NULL;
592
593         read_lock(&obd_dev_lock);
594         for (; i < class_devno_max(); i++) {
595                 struct obd_device *obd = class_num2obd(i);
596
597                 if (obd == NULL)
598                         continue;
599                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
600                         if (next != NULL)
601                                 *next = i+1;
602                         read_unlock(&obd_dev_lock);
603                         return obd;
604                 }
605         }
606         read_unlock(&obd_dev_lock);
607
608         return NULL;
609 }
610 EXPORT_SYMBOL(class_devices_in_group);
611
612 /**
613  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
614  * adjust sptlrpc settings accordingly.
615  */
616 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
617 {
618         struct obd_device  *obd;
619         const char         *type;
620         int                 i, rc = 0, rc2;
621
622         LASSERT(namelen > 0);
623
624         read_lock(&obd_dev_lock);
625         for (i = 0; i < class_devno_max(); i++) {
626                 obd = class_num2obd(i);
627
628                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
629                         continue;
630
631                 /* only notify mdc, osc, osp, lwp, mdt, ost
632                  * because only these have a -sptlrpc llog */
633                 type = obd->obd_type->typ_name;
634                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
635                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
636                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
637                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
638                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
639                     strcmp(type, LUSTRE_OST_NAME) != 0)
640                         continue;
641
642                 if (strncmp(obd->obd_name, fsname, namelen))
643                         continue;
644
645                 class_incref(obd, __FUNCTION__, obd);
646                 read_unlock(&obd_dev_lock);
647                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
648                                          sizeof(KEY_SPTLRPC_CONF),
649                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
650                 rc = rc ? rc : rc2;
651                 class_decref(obd, __FUNCTION__, obd);
652                 read_lock(&obd_dev_lock);
653         }
654         read_unlock(&obd_dev_lock);
655         return rc;
656 }
657 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
658
659 void obd_cleanup_caches(void)
660 {
661         ENTRY;
662         if (obd_device_cachep) {
663                 kmem_cache_destroy(obd_device_cachep);
664                 obd_device_cachep = NULL;
665         }
666         if (obdo_cachep) {
667                 kmem_cache_destroy(obdo_cachep);
668                 obdo_cachep = NULL;
669         }
670         if (import_cachep) {
671                 kmem_cache_destroy(import_cachep);
672                 import_cachep = NULL;
673         }
674
675         EXIT;
676 }
677
678 int obd_init_caches(void)
679 {
680         int rc;
681         ENTRY;
682
683         LASSERT(obd_device_cachep == NULL);
684         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
685                                               sizeof(struct obd_device),
686                                               0, 0, NULL);
687         if (!obd_device_cachep)
688                 GOTO(out, rc = -ENOMEM);
689
690         LASSERT(obdo_cachep == NULL);
691         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
692                                         0, 0, NULL);
693         if (!obdo_cachep)
694                 GOTO(out, rc = -ENOMEM);
695
696         LASSERT(import_cachep == NULL);
697         import_cachep = kmem_cache_create("ll_import_cache",
698                                           sizeof(struct obd_import),
699                                           0, 0, NULL);
700         if (!import_cachep)
701                 GOTO(out, rc = -ENOMEM);
702
703         RETURN(0);
704 out:
705         obd_cleanup_caches();
706         RETURN(rc);
707 }
708
709 /* map connection to client */
710 struct obd_export *class_conn2export(struct lustre_handle *conn)
711 {
712         struct obd_export *export;
713         ENTRY;
714
715         if (!conn) {
716                 CDEBUG(D_CACHE, "looking for null handle\n");
717                 RETURN(NULL);
718         }
719
720         if (conn->cookie == -1) {  /* this means assign a new connection */
721                 CDEBUG(D_CACHE, "want a new connection\n");
722                 RETURN(NULL);
723         }
724
725         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
726         export = class_handle2object(conn->cookie, NULL);
727         RETURN(export);
728 }
729 EXPORT_SYMBOL(class_conn2export);
730
731 struct obd_device *class_exp2obd(struct obd_export *exp)
732 {
733         if (exp)
734                 return exp->exp_obd;
735         return NULL;
736 }
737 EXPORT_SYMBOL(class_exp2obd);
738
739 struct obd_device *class_conn2obd(struct lustre_handle *conn)
740 {
741         struct obd_export *export;
742         export = class_conn2export(conn);
743         if (export) {
744                 struct obd_device *obd = export->exp_obd;
745                 class_export_put(export);
746                 return obd;
747         }
748         return NULL;
749 }
750
751 struct obd_import *class_exp2cliimp(struct obd_export *exp)
752 {
753         struct obd_device *obd = exp->exp_obd;
754         if (obd == NULL)
755                 return NULL;
756         return obd->u.cli.cl_import;
757 }
758 EXPORT_SYMBOL(class_exp2cliimp);
759
760 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
761 {
762         struct obd_device *obd = class_conn2obd(conn);
763         if (obd == NULL)
764                 return NULL;
765         return obd->u.cli.cl_import;
766 }
767
768 /* Export management functions */
769 static void class_export_destroy(struct obd_export *exp)
770 {
771         struct obd_device *obd = exp->exp_obd;
772         ENTRY;
773
774         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
775         LASSERT(obd != NULL);
776
777         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
778                exp->exp_client_uuid.uuid, obd->obd_name);
779
780         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
781         if (exp->exp_connection)
782                 ptlrpc_put_connection_superhack(exp->exp_connection);
783
784         LASSERT(list_empty(&exp->exp_outstanding_replies));
785         LASSERT(list_empty(&exp->exp_uncommitted_replies));
786         LASSERT(list_empty(&exp->exp_req_replay_queue));
787         LASSERT(list_empty(&exp->exp_hp_rpcs));
788         obd_destroy_export(exp);
789         class_decref(obd, "export", exp);
790
791         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
792         EXIT;
793 }
794
795 static void export_handle_addref(void *export)
796 {
797         class_export_get(export);
798 }
799
800 static struct portals_handle_ops export_handle_ops = {
801         .hop_addref = export_handle_addref,
802         .hop_free   = NULL,
803 };
804
805 struct obd_export *class_export_get(struct obd_export *exp)
806 {
807         atomic_inc(&exp->exp_refcount);
808         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
809                atomic_read(&exp->exp_refcount));
810         return exp;
811 }
812 EXPORT_SYMBOL(class_export_get);
813
814 void class_export_put(struct obd_export *exp)
815 {
816         LASSERT(exp != NULL);
817         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
818         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
819                atomic_read(&exp->exp_refcount) - 1);
820
821         if (atomic_dec_and_test(&exp->exp_refcount)) {
822                 LASSERT(!list_empty(&exp->exp_obd_chain));
823                 LASSERT(list_empty(&exp->exp_stale_list));
824                 CDEBUG(D_IOCTL, "final put %p/%s\n",
825                        exp, exp->exp_client_uuid.uuid);
826
827                 /* release nid stat refererence */
828                 lprocfs_exp_cleanup(exp);
829
830                 obd_zombie_export_add(exp);
831         }
832 }
833 EXPORT_SYMBOL(class_export_put);
834
835 /* Creates a new export, adds it to the hash table, and returns a
836  * pointer to it. The refcount is 2: one for the hash reference, and
837  * one for the pointer returned by this function. */
838 struct obd_export *class_new_export(struct obd_device *obd,
839                                     struct obd_uuid *cluuid)
840 {
841         struct obd_export *export;
842         struct cfs_hash *hash = NULL;
843         int rc = 0;
844         ENTRY;
845
846         OBD_ALLOC_PTR(export);
847         if (!export)
848                 return ERR_PTR(-ENOMEM);
849
850         export->exp_conn_cnt = 0;
851         export->exp_lock_hash = NULL;
852         export->exp_flock_hash = NULL;
853         atomic_set(&export->exp_refcount, 2);
854         atomic_set(&export->exp_rpc_count, 0);
855         atomic_set(&export->exp_cb_count, 0);
856         atomic_set(&export->exp_locks_count, 0);
857 #if LUSTRE_TRACKS_LOCK_EXP_REFS
858         INIT_LIST_HEAD(&export->exp_locks_list);
859         spin_lock_init(&export->exp_locks_list_guard);
860 #endif
861         atomic_set(&export->exp_replay_count, 0);
862         export->exp_obd = obd;
863         INIT_LIST_HEAD(&export->exp_outstanding_replies);
864         spin_lock_init(&export->exp_uncommitted_replies_lock);
865         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
866         INIT_LIST_HEAD(&export->exp_req_replay_queue);
867         INIT_LIST_HEAD(&export->exp_handle.h_link);
868         INIT_LIST_HEAD(&export->exp_hp_rpcs);
869         INIT_LIST_HEAD(&export->exp_reg_rpcs);
870         class_handle_hash(&export->exp_handle, &export_handle_ops);
871         export->exp_last_request_time = cfs_time_current_sec();
872         spin_lock_init(&export->exp_lock);
873         spin_lock_init(&export->exp_rpc_lock);
874         INIT_HLIST_NODE(&export->exp_uuid_hash);
875         INIT_HLIST_NODE(&export->exp_nid_hash);
876         INIT_HLIST_NODE(&export->exp_gen_hash);
877         spin_lock_init(&export->exp_bl_list_lock);
878         INIT_LIST_HEAD(&export->exp_bl_list);
879         INIT_LIST_HEAD(&export->exp_stale_list);
880
881         export->exp_sp_peer = LUSTRE_SP_ANY;
882         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
883         export->exp_client_uuid = *cluuid;
884         obd_init_export(export);
885
886         spin_lock(&obd->obd_dev_lock);
887         /* shouldn't happen, but might race */
888         if (obd->obd_stopping)
889                 GOTO(exit_unlock, rc = -ENODEV);
890
891         hash = cfs_hash_getref(obd->obd_uuid_hash);
892         if (hash == NULL)
893                 GOTO(exit_unlock, rc = -ENODEV);
894         spin_unlock(&obd->obd_dev_lock);
895
896         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
897                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
898                 if (rc != 0) {
899                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
900                                       obd->obd_name, cluuid->uuid, rc);
901                         GOTO(exit_err, rc = -EALREADY);
902                 }
903         }
904
905         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
906         spin_lock(&obd->obd_dev_lock);
907         if (obd->obd_stopping) {
908                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
909                 GOTO(exit_unlock, rc = -ENODEV);
910         }
911
912         class_incref(obd, "export", export);
913         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
914         list_add_tail(&export->exp_obd_chain_timed,
915                       &export->exp_obd->obd_exports_timed);
916         export->exp_obd->obd_num_exports++;
917         spin_unlock(&obd->obd_dev_lock);
918         cfs_hash_putref(hash);
919         RETURN(export);
920
921 exit_unlock:
922         spin_unlock(&obd->obd_dev_lock);
923 exit_err:
924         if (hash)
925                 cfs_hash_putref(hash);
926         class_handle_unhash(&export->exp_handle);
927         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
928         obd_destroy_export(export);
929         OBD_FREE_PTR(export);
930         return ERR_PTR(rc);
931 }
932 EXPORT_SYMBOL(class_new_export);
933
934 void class_unlink_export(struct obd_export *exp)
935 {
936         class_handle_unhash(&exp->exp_handle);
937
938         spin_lock(&exp->exp_obd->obd_dev_lock);
939         /* delete an uuid-export hashitem from hashtables */
940         if (!hlist_unhashed(&exp->exp_uuid_hash))
941                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
942                              &exp->exp_client_uuid,
943                              &exp->exp_uuid_hash);
944
945         if (!hlist_unhashed(&exp->exp_gen_hash)) {
946                 struct tg_export_data   *ted = &exp->exp_target_data;
947                 struct cfs_hash         *hash;
948
949                 /* Because obd_gen_hash will not be released until
950                  * class_cleanup(), so hash should never be NULL here */
951                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
952                 LASSERT(hash != NULL);
953                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
954                              &exp->exp_gen_hash);
955                 cfs_hash_putref(hash);
956         }
957
958         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
959         list_del_init(&exp->exp_obd_chain_timed);
960         exp->exp_obd->obd_num_exports--;
961         spin_unlock(&exp->exp_obd->obd_dev_lock);
962         atomic_inc(&obd_stale_export_num);
963
964         /* A reference is kept by obd_stale_exports list */
965         obd_stale_export_put(exp);
966 }
967 EXPORT_SYMBOL(class_unlink_export);
968
969 /* Import management functions */
970 static void class_import_destroy(struct obd_import *imp)
971 {
972         ENTRY;
973
974         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
975                 imp->imp_obd->obd_name);
976
977         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
978
979         ptlrpc_put_connection_superhack(imp->imp_connection);
980
981         while (!list_empty(&imp->imp_conn_list)) {
982                 struct obd_import_conn *imp_conn;
983
984                 imp_conn = list_entry(imp->imp_conn_list.next,
985                                       struct obd_import_conn, oic_item);
986                 list_del_init(&imp_conn->oic_item);
987                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
988                 OBD_FREE(imp_conn, sizeof(*imp_conn));
989         }
990
991         LASSERT(imp->imp_sec == NULL);
992         class_decref(imp->imp_obd, "import", imp);
993         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
994         EXIT;
995 }
996
997 static void import_handle_addref(void *import)
998 {
999         class_import_get(import);
1000 }
1001
1002 static struct portals_handle_ops import_handle_ops = {
1003         .hop_addref = import_handle_addref,
1004         .hop_free   = NULL,
1005 };
1006
1007 struct obd_import *class_import_get(struct obd_import *import)
1008 {
1009         atomic_inc(&import->imp_refcount);
1010         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1011                atomic_read(&import->imp_refcount),
1012                import->imp_obd->obd_name);
1013         return import;
1014 }
1015 EXPORT_SYMBOL(class_import_get);
1016
1017 void class_import_put(struct obd_import *imp)
1018 {
1019         ENTRY;
1020
1021         LASSERT(list_empty(&imp->imp_zombie_chain));
1022         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1023
1024         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1025                atomic_read(&imp->imp_refcount) - 1,
1026                imp->imp_obd->obd_name);
1027
1028         if (atomic_dec_and_test(&imp->imp_refcount)) {
1029                 CDEBUG(D_INFO, "final put import %p\n", imp);
1030                 /* Drop security policy instance after all RPCs have
1031                  * finished/aborted to let all busy contexts be released. */
1032                 sptlrpc_sec_put_superhack(imp);
1033
1034                 obd_zombie_import_add(imp);
1035         }
1036
1037         /* catch possible import put race */
1038         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1039         EXIT;
1040 }
1041 EXPORT_SYMBOL(class_import_put);
1042
1043 static void init_imp_at(struct imp_at *at) {
1044         int i;
1045         at_init(&at->iat_net_latency, 0, 0);
1046         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1047                 /* max service estimates are tracked on the server side, so
1048                    don't use the AT history here, just use the last reported
1049                    val. (But keep hist for proc histogram, worst_ever) */
1050                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1051                         AT_FLG_NOHIST);
1052         }
1053 }
1054
1055 struct obd_import *class_new_import(struct obd_device *obd)
1056 {
1057         struct obd_import *imp;
1058
1059         OBD_ALLOC(imp, sizeof(*imp));
1060         if (imp == NULL)
1061                 return NULL;
1062
1063         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1064         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1065         INIT_LIST_HEAD(&imp->imp_replay_list);
1066         INIT_LIST_HEAD(&imp->imp_sending_list);
1067         INIT_LIST_HEAD(&imp->imp_delayed_list);
1068         INIT_LIST_HEAD(&imp->imp_committed_list);
1069         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1070         imp->imp_known_replied_xid = 0;
1071         imp->imp_replay_cursor = &imp->imp_committed_list;
1072         spin_lock_init(&imp->imp_lock);
1073         imp->imp_last_success_conn = 0;
1074         imp->imp_state = LUSTRE_IMP_NEW;
1075         imp->imp_obd = class_incref(obd, "import", imp);
1076         mutex_init(&imp->imp_sec_mutex);
1077         init_waitqueue_head(&imp->imp_recovery_waitq);
1078
1079         atomic_set(&imp->imp_refcount, 2);
1080         atomic_set(&imp->imp_unregistering, 0);
1081         atomic_set(&imp->imp_inflight, 0);
1082         atomic_set(&imp->imp_replay_inflight, 0);
1083         atomic_set(&imp->imp_inval_count, 0);
1084         INIT_LIST_HEAD(&imp->imp_conn_list);
1085         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1086         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1087         init_imp_at(&imp->imp_at);
1088
1089         /* the default magic is V2, will be used in connect RPC, and
1090          * then adjusted according to the flags in request/reply. */
1091         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1092
1093         return imp;
1094 }
1095 EXPORT_SYMBOL(class_new_import);
1096
1097 void class_destroy_import(struct obd_import *import)
1098 {
1099         LASSERT(import != NULL);
1100         LASSERT(import != LP_POISON);
1101
1102         class_handle_unhash(&import->imp_handle);
1103
1104         spin_lock(&import->imp_lock);
1105         import->imp_generation++;
1106         spin_unlock(&import->imp_lock);
1107         class_import_put(import);
1108 }
1109 EXPORT_SYMBOL(class_destroy_import);
1110
1111 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1112
1113 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1114 {
1115         spin_lock(&exp->exp_locks_list_guard);
1116
1117         LASSERT(lock->l_exp_refs_nr >= 0);
1118
1119         if (lock->l_exp_refs_target != NULL &&
1120             lock->l_exp_refs_target != exp) {
1121                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1122                               exp, lock, lock->l_exp_refs_target);
1123         }
1124         if ((lock->l_exp_refs_nr ++) == 0) {
1125                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1126                 lock->l_exp_refs_target = exp;
1127         }
1128         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1129                lock, exp, lock->l_exp_refs_nr);
1130         spin_unlock(&exp->exp_locks_list_guard);
1131 }
1132
1133 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1134 {
1135         spin_lock(&exp->exp_locks_list_guard);
1136         LASSERT(lock->l_exp_refs_nr > 0);
1137         if (lock->l_exp_refs_target != exp) {
1138                 LCONSOLE_WARN("lock %p, "
1139                               "mismatching export pointers: %p, %p\n",
1140                               lock, lock->l_exp_refs_target, exp);
1141         }
1142         if (-- lock->l_exp_refs_nr == 0) {
1143                 list_del_init(&lock->l_exp_refs_link);
1144                 lock->l_exp_refs_target = NULL;
1145         }
1146         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1147                lock, exp, lock->l_exp_refs_nr);
1148         spin_unlock(&exp->exp_locks_list_guard);
1149 }
1150 #endif
1151
1152 /* A connection defines an export context in which preallocation can
1153    be managed. This releases the export pointer reference, and returns
1154    the export handle, so the export refcount is 1 when this function
1155    returns. */
1156 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1157                   struct obd_uuid *cluuid)
1158 {
1159         struct obd_export *export;
1160         LASSERT(conn != NULL);
1161         LASSERT(obd != NULL);
1162         LASSERT(cluuid != NULL);
1163         ENTRY;
1164
1165         export = class_new_export(obd, cluuid);
1166         if (IS_ERR(export))
1167                 RETURN(PTR_ERR(export));
1168
1169         conn->cookie = export->exp_handle.h_cookie;
1170         class_export_put(export);
1171
1172         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1173                cluuid->uuid, conn->cookie);
1174         RETURN(0);
1175 }
1176 EXPORT_SYMBOL(class_connect);
1177
1178 /* if export is involved in recovery then clean up related things */
1179 static void class_export_recovery_cleanup(struct obd_export *exp)
1180 {
1181         struct obd_device *obd = exp->exp_obd;
1182
1183         spin_lock(&obd->obd_recovery_task_lock);
1184         if (obd->obd_recovering) {
1185                 if (exp->exp_in_recovery) {
1186                         spin_lock(&exp->exp_lock);
1187                         exp->exp_in_recovery = 0;
1188                         spin_unlock(&exp->exp_lock);
1189                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1190                         atomic_dec(&obd->obd_connected_clients);
1191                 }
1192
1193                 /* if called during recovery then should update
1194                  * obd_stale_clients counter,
1195                  * lightweight exports are not counted */
1196                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1197                         exp->exp_obd->obd_stale_clients++;
1198         }
1199         spin_unlock(&obd->obd_recovery_task_lock);
1200
1201         spin_lock(&exp->exp_lock);
1202         /** Cleanup req replay fields */
1203         if (exp->exp_req_replay_needed) {
1204                 exp->exp_req_replay_needed = 0;
1205
1206                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1207                 atomic_dec(&obd->obd_req_replay_clients);
1208         }
1209
1210         /** Cleanup lock replay data */
1211         if (exp->exp_lock_replay_needed) {
1212                 exp->exp_lock_replay_needed = 0;
1213
1214                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1215                 atomic_dec(&obd->obd_lock_replay_clients);
1216         }
1217         spin_unlock(&exp->exp_lock);
1218 }
1219
1220 /* This function removes 1-3 references from the export:
1221  * 1 - for export pointer passed
1222  * and if disconnect really need
1223  * 2 - removing from hash
1224  * 3 - in client_unlink_export
1225  * The export pointer passed to this function can destroyed */
1226 int class_disconnect(struct obd_export *export)
1227 {
1228         int already_disconnected;
1229         ENTRY;
1230
1231         if (export == NULL) {
1232                 CWARN("attempting to free NULL export %p\n", export);
1233                 RETURN(-EINVAL);
1234         }
1235
1236         spin_lock(&export->exp_lock);
1237         already_disconnected = export->exp_disconnected;
1238         export->exp_disconnected = 1;
1239         spin_unlock(&export->exp_lock);
1240
1241         /* class_cleanup(), abort_recovery(), and class_fail_export()
1242          * all end up in here, and if any of them race we shouldn't
1243          * call extra class_export_puts(). */
1244         if (already_disconnected) {
1245                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1246                 GOTO(no_disconn, already_disconnected);
1247         }
1248
1249         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1250                export->exp_handle.h_cookie);
1251
1252         if (!hlist_unhashed(&export->exp_nid_hash))
1253                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1254                              &export->exp_connection->c_peer.nid,
1255                              &export->exp_nid_hash);
1256
1257         class_export_recovery_cleanup(export);
1258         class_unlink_export(export);
1259 no_disconn:
1260         class_export_put(export);
1261         RETURN(0);
1262 }
1263 EXPORT_SYMBOL(class_disconnect);
1264
1265 /* Return non-zero for a fully connected export */
1266 int class_connected_export(struct obd_export *exp)
1267 {
1268         int connected = 0;
1269
1270         if (exp) {
1271                 spin_lock(&exp->exp_lock);
1272                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1273                 spin_unlock(&exp->exp_lock);
1274         }
1275         return connected;
1276 }
1277 EXPORT_SYMBOL(class_connected_export);
1278
1279 static void class_disconnect_export_list(struct list_head *list,
1280                                          enum obd_option flags)
1281 {
1282         int rc;
1283         struct obd_export *exp;
1284         ENTRY;
1285
1286         /* It's possible that an export may disconnect itself, but
1287          * nothing else will be added to this list. */
1288         while (!list_empty(list)) {
1289                 exp = list_entry(list->next, struct obd_export,
1290                                  exp_obd_chain);
1291                 /* need for safe call CDEBUG after obd_disconnect */
1292                 class_export_get(exp);
1293
1294                 spin_lock(&exp->exp_lock);
1295                 exp->exp_flags = flags;
1296                 spin_unlock(&exp->exp_lock);
1297
1298                 if (obd_uuid_equals(&exp->exp_client_uuid,
1299                                     &exp->exp_obd->obd_uuid)) {
1300                         CDEBUG(D_HA,
1301                                "exp %p export uuid == obd uuid, don't discon\n",
1302                                exp);
1303                         /* Need to delete this now so we don't end up pointing
1304                          * to work_list later when this export is cleaned up. */
1305                         list_del_init(&exp->exp_obd_chain);
1306                         class_export_put(exp);
1307                         continue;
1308                 }
1309
1310                 class_export_get(exp);
1311                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1312                        "last request at "CFS_TIME_T"\n",
1313                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1314                        exp, exp->exp_last_request_time);
1315                 /* release one export reference anyway */
1316                 rc = obd_disconnect(exp);
1317
1318                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1319                        obd_export_nid2str(exp), exp, rc);
1320                 class_export_put(exp);
1321         }
1322         EXIT;
1323 }
1324
1325 void class_disconnect_exports(struct obd_device *obd)
1326 {
1327         struct list_head work_list;
1328         ENTRY;
1329
1330         /* Move all of the exports from obd_exports to a work list, en masse. */
1331         INIT_LIST_HEAD(&work_list);
1332         spin_lock(&obd->obd_dev_lock);
1333         list_splice_init(&obd->obd_exports, &work_list);
1334         list_splice_init(&obd->obd_delayed_exports, &work_list);
1335         spin_unlock(&obd->obd_dev_lock);
1336
1337         if (!list_empty(&work_list)) {
1338                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1339                        "disconnecting them\n", obd->obd_minor, obd);
1340                 class_disconnect_export_list(&work_list,
1341                                              exp_flags_from_obd(obd));
1342         } else
1343                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1344                        obd->obd_minor, obd);
1345         EXIT;
1346 }
1347 EXPORT_SYMBOL(class_disconnect_exports);
1348
1349 /* Remove exports that have not completed recovery.
1350  */
1351 void class_disconnect_stale_exports(struct obd_device *obd,
1352                                     int (*test_export)(struct obd_export *))
1353 {
1354         struct list_head work_list;
1355         struct obd_export *exp, *n;
1356         int evicted = 0;
1357         ENTRY;
1358
1359         INIT_LIST_HEAD(&work_list);
1360         spin_lock(&obd->obd_dev_lock);
1361         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1362                                  exp_obd_chain) {
1363                 /* don't count self-export as client */
1364                 if (obd_uuid_equals(&exp->exp_client_uuid,
1365                                     &exp->exp_obd->obd_uuid))
1366                         continue;
1367
1368                 /* don't evict clients which have no slot in last_rcvd
1369                  * (e.g. lightweight connection) */
1370                 if (exp->exp_target_data.ted_lr_idx == -1)
1371                         continue;
1372
1373                 spin_lock(&exp->exp_lock);
1374                 if (exp->exp_failed || test_export(exp)) {
1375                         spin_unlock(&exp->exp_lock);
1376                         continue;
1377                 }
1378                 exp->exp_failed = 1;
1379                 spin_unlock(&exp->exp_lock);
1380
1381                 list_move(&exp->exp_obd_chain, &work_list);
1382                 evicted++;
1383                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1384                        obd->obd_name, exp->exp_client_uuid.uuid,
1385                        exp->exp_connection == NULL ? "<unknown>" :
1386                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1387                 print_export_data(exp, "EVICTING", 0);
1388         }
1389         spin_unlock(&obd->obd_dev_lock);
1390
1391         if (evicted)
1392                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1393                               obd->obd_name, evicted);
1394
1395         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1396                                                  OBD_OPT_ABORT_RECOV);
1397         EXIT;
1398 }
1399 EXPORT_SYMBOL(class_disconnect_stale_exports);
1400
1401 void class_fail_export(struct obd_export *exp)
1402 {
1403         int rc, already_failed;
1404
1405         spin_lock(&exp->exp_lock);
1406         already_failed = exp->exp_failed;
1407         exp->exp_failed = 1;
1408         spin_unlock(&exp->exp_lock);
1409
1410         if (already_failed) {
1411                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1412                        exp, exp->exp_client_uuid.uuid);
1413                 return;
1414         }
1415
1416         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1417                exp, exp->exp_client_uuid.uuid);
1418
1419         if (obd_dump_on_timeout)
1420                 libcfs_debug_dumplog();
1421
1422         /* need for safe call CDEBUG after obd_disconnect */
1423         class_export_get(exp);
1424
1425         /* Most callers into obd_disconnect are removing their own reference
1426          * (request, for example) in addition to the one from the hash table.
1427          * We don't have such a reference here, so make one. */
1428         class_export_get(exp);
1429         rc = obd_disconnect(exp);
1430         if (rc)
1431                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1432         else
1433                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1434                        exp, exp->exp_client_uuid.uuid);
1435         class_export_put(exp);
1436 }
1437 EXPORT_SYMBOL(class_fail_export);
1438
1439 char *obd_export_nid2str(struct obd_export *exp)
1440 {
1441         if (exp->exp_connection != NULL)
1442                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1443
1444         return "(no nid)";
1445 }
1446 EXPORT_SYMBOL(obd_export_nid2str);
1447
1448 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1449 {
1450         struct cfs_hash *nid_hash;
1451         struct obd_export *doomed_exp = NULL;
1452         int exports_evicted = 0;
1453
1454         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1455
1456         spin_lock(&obd->obd_dev_lock);
1457         /* umount has run already, so evict thread should leave
1458          * its task to umount thread now */
1459         if (obd->obd_stopping) {
1460                 spin_unlock(&obd->obd_dev_lock);
1461                 return exports_evicted;
1462         }
1463         nid_hash = obd->obd_nid_hash;
1464         cfs_hash_getref(nid_hash);
1465         spin_unlock(&obd->obd_dev_lock);
1466
1467         do {
1468                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1469                 if (doomed_exp == NULL)
1470                         break;
1471
1472                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1473                          "nid %s found, wanted nid %s, requested nid %s\n",
1474                          obd_export_nid2str(doomed_exp),
1475                          libcfs_nid2str(nid_key), nid);
1476                 LASSERTF(doomed_exp != obd->obd_self_export,
1477                          "self-export is hashed by NID?\n");
1478                 exports_evicted++;
1479                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1480                               "request\n", obd->obd_name,
1481                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1482                               obd_export_nid2str(doomed_exp));
1483                 class_fail_export(doomed_exp);
1484                 class_export_put(doomed_exp);
1485         } while (1);
1486
1487         cfs_hash_putref(nid_hash);
1488
1489         if (!exports_evicted)
1490                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1491                        obd->obd_name, nid);
1492         return exports_evicted;
1493 }
1494 EXPORT_SYMBOL(obd_export_evict_by_nid);
1495
1496 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1497 {
1498         struct cfs_hash *uuid_hash;
1499         struct obd_export *doomed_exp = NULL;
1500         struct obd_uuid doomed_uuid;
1501         int exports_evicted = 0;
1502
1503         spin_lock(&obd->obd_dev_lock);
1504         if (obd->obd_stopping) {
1505                 spin_unlock(&obd->obd_dev_lock);
1506                 return exports_evicted;
1507         }
1508         uuid_hash = obd->obd_uuid_hash;
1509         cfs_hash_getref(uuid_hash);
1510         spin_unlock(&obd->obd_dev_lock);
1511
1512         obd_str2uuid(&doomed_uuid, uuid);
1513         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1514                 CERROR("%s: can't evict myself\n", obd->obd_name);
1515                 cfs_hash_putref(uuid_hash);
1516                 return exports_evicted;
1517         }
1518
1519         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1520
1521         if (doomed_exp == NULL) {
1522                 CERROR("%s: can't disconnect %s: no exports found\n",
1523                        obd->obd_name, uuid);
1524         } else {
1525                 CWARN("%s: evicting %s at adminstrative request\n",
1526                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1527                 class_fail_export(doomed_exp);
1528                 class_export_put(doomed_exp);
1529                 exports_evicted++;
1530         }
1531         cfs_hash_putref(uuid_hash);
1532
1533         return exports_evicted;
1534 }
1535
1536 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1537 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1538 #endif
1539
1540 static void print_export_data(struct obd_export *exp, const char *status,
1541                               int locks)
1542 {
1543         struct ptlrpc_reply_state *rs;
1544         struct ptlrpc_reply_state *first_reply = NULL;
1545         int nreplies = 0;
1546
1547         spin_lock(&exp->exp_lock);
1548         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1549                             rs_exp_list) {
1550                 if (nreplies == 0)
1551                         first_reply = rs;
1552                 nreplies++;
1553         }
1554         spin_unlock(&exp->exp_lock);
1555
1556         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1557                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1558                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1559                atomic_read(&exp->exp_rpc_count),
1560                atomic_read(&exp->exp_cb_count),
1561                atomic_read(&exp->exp_locks_count),
1562                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1563                nreplies, first_reply, nreplies > 3 ? "..." : "",
1564                exp->exp_last_committed);
1565 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1566         if (locks && class_export_dump_hook != NULL)
1567                 class_export_dump_hook(exp);
1568 #endif
1569 }
1570
1571 void dump_exports(struct obd_device *obd, int locks)
1572 {
1573         struct obd_export *exp;
1574
1575         spin_lock(&obd->obd_dev_lock);
1576         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1577                 print_export_data(exp, "ACTIVE", locks);
1578         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1579                 print_export_data(exp, "UNLINKED", locks);
1580         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1581                 print_export_data(exp, "DELAYED", locks);
1582         spin_unlock(&obd->obd_dev_lock);
1583         spin_lock(&obd_zombie_impexp_lock);
1584         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1585                 print_export_data(exp, "ZOMBIE", locks);
1586         spin_unlock(&obd_zombie_impexp_lock);
1587 }
1588
1589 void obd_exports_barrier(struct obd_device *obd)
1590 {
1591         int waited = 2;
1592         LASSERT(list_empty(&obd->obd_exports));
1593         spin_lock(&obd->obd_dev_lock);
1594         while (!list_empty(&obd->obd_unlinked_exports)) {
1595                 spin_unlock(&obd->obd_dev_lock);
1596                 set_current_state(TASK_UNINTERRUPTIBLE);
1597                 schedule_timeout(cfs_time_seconds(waited));
1598                 if (waited > 5 && IS_PO2(waited)) {
1599                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1600                                       "more than %d seconds. "
1601                                       "The obd refcount = %d. Is it stuck?\n",
1602                                       obd->obd_name, waited,
1603                                       atomic_read(&obd->obd_refcount));
1604                         dump_exports(obd, 1);
1605                 }
1606                 waited *= 2;
1607                 spin_lock(&obd->obd_dev_lock);
1608         }
1609         spin_unlock(&obd->obd_dev_lock);
1610 }
1611 EXPORT_SYMBOL(obd_exports_barrier);
1612
1613 /* Total amount of zombies to be destroyed */
1614 static int zombies_count = 0;
1615
1616 /**
1617  * kill zombie imports and exports
1618  */
1619 void obd_zombie_impexp_cull(void)
1620 {
1621         struct obd_import *import;
1622         struct obd_export *export;
1623         ENTRY;
1624
1625         do {
1626                 spin_lock(&obd_zombie_impexp_lock);
1627
1628                 import = NULL;
1629                 if (!list_empty(&obd_zombie_imports)) {
1630                         import = list_entry(obd_zombie_imports.next,
1631                                             struct obd_import,
1632                                             imp_zombie_chain);
1633                         list_del_init(&import->imp_zombie_chain);
1634                 }
1635
1636                 export = NULL;
1637                 if (!list_empty(&obd_zombie_exports)) {
1638                         export = list_entry(obd_zombie_exports.next,
1639                                             struct obd_export,
1640                                             exp_obd_chain);
1641                         list_del_init(&export->exp_obd_chain);
1642                 }
1643
1644                 spin_unlock(&obd_zombie_impexp_lock);
1645
1646                 if (import != NULL) {
1647                         class_import_destroy(import);
1648                         spin_lock(&obd_zombie_impexp_lock);
1649                         zombies_count--;
1650                         spin_unlock(&obd_zombie_impexp_lock);
1651                 }
1652
1653                 if (export != NULL) {
1654                         class_export_destroy(export);
1655                         spin_lock(&obd_zombie_impexp_lock);
1656                         zombies_count--;
1657                         spin_unlock(&obd_zombie_impexp_lock);
1658                 }
1659
1660                 cond_resched();
1661         } while (import != NULL || export != NULL);
1662         EXIT;
1663 }
1664
1665 static struct completion        obd_zombie_start;
1666 static struct completion        obd_zombie_stop;
1667 static unsigned long            obd_zombie_flags;
1668 static wait_queue_head_t        obd_zombie_waitq;
1669 static pid_t                    obd_zombie_pid;
1670
1671 enum {
1672         OBD_ZOMBIE_STOP         = 0x0001,
1673 };
1674
1675 /**
1676  * check for work for kill zombie import/export thread.
1677  */
1678 static int obd_zombie_impexp_check(void *arg)
1679 {
1680         int rc;
1681
1682         spin_lock(&obd_zombie_impexp_lock);
1683         rc = (zombies_count == 0) &&
1684              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1685         spin_unlock(&obd_zombie_impexp_lock);
1686
1687         RETURN(rc);
1688 }
1689
1690 /**
1691  * Add export to the obd_zombe thread and notify it.
1692  */
1693 static void obd_zombie_export_add(struct obd_export *exp) {
1694         atomic_dec(&obd_stale_export_num);
1695         spin_lock(&exp->exp_obd->obd_dev_lock);
1696         LASSERT(!list_empty(&exp->exp_obd_chain));
1697         list_del_init(&exp->exp_obd_chain);
1698         spin_unlock(&exp->exp_obd->obd_dev_lock);
1699         spin_lock(&obd_zombie_impexp_lock);
1700         zombies_count++;
1701         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1702         spin_unlock(&obd_zombie_impexp_lock);
1703
1704         obd_zombie_impexp_notify();
1705 }
1706
1707 /**
1708  * Add import to the obd_zombe thread and notify it.
1709  */
1710 static void obd_zombie_import_add(struct obd_import *imp) {
1711         LASSERT(imp->imp_sec == NULL);
1712         spin_lock(&obd_zombie_impexp_lock);
1713         LASSERT(list_empty(&imp->imp_zombie_chain));
1714         zombies_count++;
1715         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1716         spin_unlock(&obd_zombie_impexp_lock);
1717
1718         obd_zombie_impexp_notify();
1719 }
1720
1721 /**
1722  * notify import/export destroy thread about new zombie.
1723  */
1724 static void obd_zombie_impexp_notify(void)
1725 {
1726         /*
1727          * Make sure obd_zomebie_impexp_thread get this notification.
1728          * It is possible this signal only get by obd_zombie_barrier, and
1729          * barrier gulps this notification and sleeps away and hangs ensues
1730          */
1731         wake_up_all(&obd_zombie_waitq);
1732 }
1733
1734 /**
1735  * check whether obd_zombie is idle
1736  */
1737 static int obd_zombie_is_idle(void)
1738 {
1739         int rc;
1740
1741         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1742         spin_lock(&obd_zombie_impexp_lock);
1743         rc = (zombies_count == 0);
1744         spin_unlock(&obd_zombie_impexp_lock);
1745         return rc;
1746 }
1747
1748 /**
1749  * wait when obd_zombie import/export queues become empty
1750  */
1751 void obd_zombie_barrier(void)
1752 {
1753         struct l_wait_info lwi = { 0 };
1754
1755         if (obd_zombie_pid == current_pid())
1756                 /* don't wait for myself */
1757                 return;
1758         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1759 }
1760 EXPORT_SYMBOL(obd_zombie_barrier);
1761
1762
1763 struct obd_export *obd_stale_export_get(void)
1764 {
1765         struct obd_export *exp = NULL;
1766         ENTRY;
1767
1768         spin_lock(&obd_stale_export_lock);
1769         if (!list_empty(&obd_stale_exports)) {
1770                 exp = list_entry(obd_stale_exports.next,
1771                                  struct obd_export, exp_stale_list);
1772                 list_del_init(&exp->exp_stale_list);
1773         }
1774         spin_unlock(&obd_stale_export_lock);
1775
1776         if (exp) {
1777                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1778                        atomic_read(&obd_stale_export_num));
1779         }
1780         RETURN(exp);
1781 }
1782 EXPORT_SYMBOL(obd_stale_export_get);
1783
1784 void obd_stale_export_put(struct obd_export *exp)
1785 {
1786         ENTRY;
1787
1788         LASSERT(list_empty(&exp->exp_stale_list));
1789         if (exp->exp_lock_hash &&
1790             atomic_read(&exp->exp_lock_hash->hs_count)) {
1791                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1792                        atomic_read(&obd_stale_export_num));
1793
1794                 spin_lock_bh(&exp->exp_bl_list_lock);
1795                 spin_lock(&obd_stale_export_lock);
1796                 /* Add to the tail if there is no blocked locks,
1797                  * to the head otherwise. */
1798                 if (list_empty(&exp->exp_bl_list))
1799                         list_add_tail(&exp->exp_stale_list,
1800                                       &obd_stale_exports);
1801                 else
1802                         list_add(&exp->exp_stale_list,
1803                                  &obd_stale_exports);
1804
1805                 spin_unlock(&obd_stale_export_lock);
1806                 spin_unlock_bh(&exp->exp_bl_list_lock);
1807         } else {
1808                 class_export_put(exp);
1809         }
1810         EXIT;
1811 }
1812 EXPORT_SYMBOL(obd_stale_export_put);
1813
1814 /**
1815  * Adjust the position of the export in the stale list,
1816  * i.e. move to the head of the list if is needed.
1817  **/
1818 void obd_stale_export_adjust(struct obd_export *exp)
1819 {
1820         LASSERT(exp != NULL);
1821         spin_lock_bh(&exp->exp_bl_list_lock);
1822         spin_lock(&obd_stale_export_lock);
1823
1824         if (!list_empty(&exp->exp_stale_list) &&
1825             !list_empty(&exp->exp_bl_list))
1826                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1827
1828         spin_unlock(&obd_stale_export_lock);
1829         spin_unlock_bh(&exp->exp_bl_list_lock);
1830 }
1831 EXPORT_SYMBOL(obd_stale_export_adjust);
1832
1833 /**
1834  * destroy zombie export/import thread.
1835  */
1836 static int obd_zombie_impexp_thread(void *unused)
1837 {
1838         unshare_fs_struct();
1839         complete(&obd_zombie_start);
1840
1841         obd_zombie_pid = current_pid();
1842
1843         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1844                 struct l_wait_info lwi = { 0 };
1845
1846                 l_wait_event(obd_zombie_waitq,
1847                              !obd_zombie_impexp_check(NULL), &lwi);
1848                 obd_zombie_impexp_cull();
1849
1850                 /*
1851                  * Notify obd_zombie_barrier callers that queues
1852                  * may be empty.
1853                  */
1854                 wake_up(&obd_zombie_waitq);
1855         }
1856
1857         complete(&obd_zombie_stop);
1858
1859         RETURN(0);
1860 }
1861
1862
1863 /**
1864  * start destroy zombie import/export thread
1865  */
1866 int obd_zombie_impexp_init(void)
1867 {
1868         struct task_struct *task;
1869
1870         INIT_LIST_HEAD(&obd_zombie_imports);
1871
1872         INIT_LIST_HEAD(&obd_zombie_exports);
1873         spin_lock_init(&obd_zombie_impexp_lock);
1874         init_completion(&obd_zombie_start);
1875         init_completion(&obd_zombie_stop);
1876         init_waitqueue_head(&obd_zombie_waitq);
1877         obd_zombie_pid = 0;
1878
1879         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1880         if (IS_ERR(task))
1881                 RETURN(PTR_ERR(task));
1882
1883         wait_for_completion(&obd_zombie_start);
1884         RETURN(0);
1885 }
1886 /**
1887  * stop destroy zombie import/export thread
1888  */
1889 void obd_zombie_impexp_stop(void)
1890 {
1891         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1892         obd_zombie_impexp_notify();
1893         wait_for_completion(&obd_zombie_stop);
1894 }
1895
1896 /***** Kernel-userspace comm helpers *******/
1897
1898 /* Get length of entire message, including header */
1899 int kuc_len(int payload_len)
1900 {
1901         return sizeof(struct kuc_hdr) + payload_len;
1902 }
1903 EXPORT_SYMBOL(kuc_len);
1904
1905 /* Get a pointer to kuc header, given a ptr to the payload
1906  * @param p Pointer to payload area
1907  * @returns Pointer to kuc header
1908  */
1909 struct kuc_hdr * kuc_ptr(void *p)
1910 {
1911         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1912         LASSERT(lh->kuc_magic == KUC_MAGIC);
1913         return lh;
1914 }
1915 EXPORT_SYMBOL(kuc_ptr);
1916
1917 /* Test if payload is part of kuc message
1918  * @param p Pointer to payload area
1919  * @returns boolean
1920  */
1921 int kuc_ispayload(void *p)
1922 {
1923         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1924
1925         if (kh->kuc_magic == KUC_MAGIC)
1926                 return 1;
1927         else
1928                 return 0;
1929 }
1930 EXPORT_SYMBOL(kuc_ispayload);
1931
1932 /* Alloc space for a message, and fill in header
1933  * @return Pointer to payload area
1934  */
1935 void *kuc_alloc(int payload_len, int transport, int type)
1936 {
1937         struct kuc_hdr *lh;
1938         int len = kuc_len(payload_len);
1939
1940         OBD_ALLOC(lh, len);
1941         if (lh == NULL)
1942                 return ERR_PTR(-ENOMEM);
1943
1944         lh->kuc_magic = KUC_MAGIC;
1945         lh->kuc_transport = transport;
1946         lh->kuc_msgtype = type;
1947         lh->kuc_msglen = len;
1948
1949         return (void *)(lh + 1);
1950 }
1951 EXPORT_SYMBOL(kuc_alloc);
1952
1953 /* Takes pointer to payload area */
1954 inline void kuc_free(void *p, int payload_len)
1955 {
1956         struct kuc_hdr *lh = kuc_ptr(p);
1957         OBD_FREE(lh, kuc_len(payload_len));
1958 }
1959 EXPORT_SYMBOL(kuc_free);
1960
1961 struct obd_request_slot_waiter {
1962         struct list_head        orsw_entry;
1963         wait_queue_head_t       orsw_waitq;
1964         bool                    orsw_signaled;
1965 };
1966
1967 static bool obd_request_slot_avail(struct client_obd *cli,
1968                                    struct obd_request_slot_waiter *orsw)
1969 {
1970         bool avail;
1971
1972         spin_lock(&cli->cl_loi_list_lock);
1973         avail = !!list_empty(&orsw->orsw_entry);
1974         spin_unlock(&cli->cl_loi_list_lock);
1975
1976         return avail;
1977 };
1978
1979 /*
1980  * For network flow control, the RPC sponsor needs to acquire a credit
1981  * before sending the RPC. The credits count for a connection is defined
1982  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1983  * the subsequent RPC sponsors need to wait until others released their
1984  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1985  */
1986 int obd_get_request_slot(struct client_obd *cli)
1987 {
1988         struct obd_request_slot_waiter   orsw;
1989         struct l_wait_info               lwi;
1990         int                              rc;
1991
1992         spin_lock(&cli->cl_loi_list_lock);
1993         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1994                 cli->cl_r_in_flight++;
1995                 spin_unlock(&cli->cl_loi_list_lock);
1996                 return 0;
1997         }
1998
1999         init_waitqueue_head(&orsw.orsw_waitq);
2000         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
2001         orsw.orsw_signaled = false;
2002         spin_unlock(&cli->cl_loi_list_lock);
2003
2004         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2005         rc = l_wait_event(orsw.orsw_waitq,
2006                           obd_request_slot_avail(cli, &orsw) ||
2007                           orsw.orsw_signaled,
2008                           &lwi);
2009
2010         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2011          * freed but other (such as obd_put_request_slot) is using it. */
2012         spin_lock(&cli->cl_loi_list_lock);
2013         if (rc != 0) {
2014                 if (!orsw.orsw_signaled) {
2015                         if (list_empty(&orsw.orsw_entry))
2016                                 cli->cl_r_in_flight--;
2017                         else
2018                                 list_del(&orsw.orsw_entry);
2019                 }
2020         }
2021
2022         if (orsw.orsw_signaled) {
2023                 LASSERT(list_empty(&orsw.orsw_entry));
2024
2025                 rc = -EINTR;
2026         }
2027         spin_unlock(&cli->cl_loi_list_lock);
2028
2029         return rc;
2030 }
2031 EXPORT_SYMBOL(obd_get_request_slot);
2032
2033 void obd_put_request_slot(struct client_obd *cli)
2034 {
2035         struct obd_request_slot_waiter *orsw;
2036
2037         spin_lock(&cli->cl_loi_list_lock);
2038         cli->cl_r_in_flight--;
2039
2040         /* If there is free slot, wakeup the first waiter. */
2041         if (!list_empty(&cli->cl_loi_read_list) &&
2042             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2043                 orsw = list_entry(cli->cl_loi_read_list.next,
2044                                   struct obd_request_slot_waiter, orsw_entry);
2045                 list_del_init(&orsw->orsw_entry);
2046                 cli->cl_r_in_flight++;
2047                 wake_up(&orsw->orsw_waitq);
2048         }
2049         spin_unlock(&cli->cl_loi_list_lock);
2050 }
2051 EXPORT_SYMBOL(obd_put_request_slot);
2052
2053 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2054 {
2055         return cli->cl_max_rpcs_in_flight;
2056 }
2057 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2058
2059 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2060 {
2061         struct obd_request_slot_waiter *orsw;
2062         __u32                           old;
2063         int                             diff;
2064         int                             i;
2065         char                            *typ_name;
2066         int                             rc;
2067
2068         if (max > OBD_MAX_RIF_MAX || max < 1)
2069                 return -ERANGE;
2070
2071         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2072         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2073                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2074                  * strictly lower that max_rpcs_in_flight */
2075                 if (max < 2) {
2076                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2077                                "because it must be higher than "
2078                                "max_mod_rpcs_in_flight value",
2079                                cli->cl_import->imp_obd->obd_name);
2080                         return -ERANGE;
2081                 }
2082                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2083                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2084                         if (rc != 0)
2085                                 return rc;
2086                 }
2087         }
2088
2089         spin_lock(&cli->cl_loi_list_lock);
2090         old = cli->cl_max_rpcs_in_flight;
2091         cli->cl_max_rpcs_in_flight = max;
2092         diff = max - old;
2093
2094         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2095         for (i = 0; i < diff; i++) {
2096                 if (list_empty(&cli->cl_loi_read_list))
2097                         break;
2098
2099                 orsw = list_entry(cli->cl_loi_read_list.next,
2100                                   struct obd_request_slot_waiter, orsw_entry);
2101                 list_del_init(&orsw->orsw_entry);
2102                 cli->cl_r_in_flight++;
2103                 wake_up(&orsw->orsw_waitq);
2104         }
2105         spin_unlock(&cli->cl_loi_list_lock);
2106
2107         return 0;
2108 }
2109 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2110
2111 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2112 {
2113         return cli->cl_max_mod_rpcs_in_flight;
2114 }
2115 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2116
2117 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2118 {
2119         struct obd_connect_data *ocd;
2120         __u16 maxmodrpcs;
2121         __u16 prev;
2122
2123         if (max > OBD_MAX_RIF_MAX || max < 1)
2124                 return -ERANGE;
2125
2126         /* cannot exceed or equal max_rpcs_in_flight */
2127         if (max >= cli->cl_max_rpcs_in_flight) {
2128                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2129                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2130                        cli->cl_import->imp_obd->obd_name,
2131                        max, cli->cl_max_rpcs_in_flight);
2132                 return -ERANGE;
2133         }
2134
2135         /* cannot exceed max modify RPCs in flight supported by the server */
2136         ocd = &cli->cl_import->imp_connect_data;
2137         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2138                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2139         else
2140                 maxmodrpcs = 1;
2141         if (max > maxmodrpcs) {
2142                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2143                        "higher than max_mod_rpcs_per_client value (%hu) "
2144                        "returned by the server at connection\n",
2145                        cli->cl_import->imp_obd->obd_name,
2146                        max, maxmodrpcs);
2147                 return -ERANGE;
2148         }
2149
2150         spin_lock(&cli->cl_mod_rpcs_lock);
2151
2152         prev = cli->cl_max_mod_rpcs_in_flight;
2153         cli->cl_max_mod_rpcs_in_flight = max;
2154
2155         /* wakeup waiters if limit has been increased */
2156         if (cli->cl_max_mod_rpcs_in_flight > prev)
2157                 wake_up(&cli->cl_mod_rpcs_waitq);
2158
2159         spin_unlock(&cli->cl_mod_rpcs_lock);
2160
2161         return 0;
2162 }
2163 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2164
2165
2166 #define pct(a, b) (b ? a * 100 / b : 0)
2167 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2168                                struct seq_file *seq)
2169 {
2170         struct timeval now;
2171         unsigned long mod_tot = 0, mod_cum;
2172         int i;
2173
2174         do_gettimeofday(&now);
2175
2176         spin_lock(&cli->cl_mod_rpcs_lock);
2177
2178         seq_printf(seq, "snapshot_time:         %lu.%lu (secs.usecs)\n",
2179                    now.tv_sec, now.tv_usec);
2180         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2181                    cli->cl_mod_rpcs_in_flight);
2182
2183         seq_printf(seq, "\n\t\t\tmodify\n");
2184         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2185
2186         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2187
2188         mod_cum = 0;
2189         for (i = 0; i < OBD_HIST_MAX; i++) {
2190                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2191                 mod_cum += mod;
2192                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2193                                  i, mod, pct(mod, mod_tot),
2194                                  pct(mod_cum, mod_tot));
2195                 if (mod_cum == mod_tot)
2196                         break;
2197         }
2198
2199         spin_unlock(&cli->cl_mod_rpcs_lock);
2200
2201         return 0;
2202 }
2203 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2204 #undef pct
2205
2206
2207 /* The number of modify RPCs sent in parallel is limited
2208  * because the server has a finite number of slots per client to
2209  * store request result and ensure reply reconstruction when needed.
2210  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2211  * that takes into account server limit and cl_max_rpcs_in_flight
2212  * value.
2213  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2214  * one close request is allowed above the maximum.
2215  */
2216 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2217                                                  bool close_req)
2218 {
2219         bool avail;
2220
2221         /* A slot is available if
2222          * - number of modify RPCs in flight is less than the max
2223          * - it's a close RPC and no other close request is in flight
2224          */
2225         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2226                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2227
2228         return avail;
2229 }
2230
2231 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2232                                          bool close_req)
2233 {
2234         bool avail;
2235
2236         spin_lock(&cli->cl_mod_rpcs_lock);
2237         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2238         spin_unlock(&cli->cl_mod_rpcs_lock);
2239         return avail;
2240 }
2241
2242 /* Get a modify RPC slot from the obd client @cli according
2243  * to the kind of operation @opc that is going to be sent
2244  * and the intent @it of the operation if it applies.
2245  * If the maximum number of modify RPCs in flight is reached
2246  * the thread is put to sleep.
2247  * Returns the tag to be set in the request message. Tag 0
2248  * is reserved for non-modifying requests.
2249  */
2250 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2251                            struct lookup_intent *it)
2252 {
2253         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2254         bool                    close_req = false;
2255         __u16                   i, max;
2256
2257         /* read-only metadata RPCs don't consume a slot on MDT
2258          * for reply reconstruction
2259          */
2260         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2261                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2262                 return 0;
2263
2264         if (opc == MDS_CLOSE)
2265                 close_req = true;
2266
2267         do {
2268                 spin_lock(&cli->cl_mod_rpcs_lock);
2269                 max = cli->cl_max_mod_rpcs_in_flight;
2270                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2271                         /* there is a slot available */
2272                         cli->cl_mod_rpcs_in_flight++;
2273                         if (close_req)
2274                                 cli->cl_close_rpcs_in_flight++;
2275                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2276                                          cli->cl_mod_rpcs_in_flight);
2277                         /* find a free tag */
2278                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2279                                                 max + 1);
2280                         LASSERT(i < OBD_MAX_RIF_MAX);
2281                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2282                         spin_unlock(&cli->cl_mod_rpcs_lock);
2283                         /* tag 0 is reserved for non-modify RPCs */
2284                         return i + 1;
2285                 }
2286                 spin_unlock(&cli->cl_mod_rpcs_lock);
2287
2288                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2289                        "opc %u, max %hu\n",
2290                        cli->cl_import->imp_obd->obd_name, opc, max);
2291
2292                 l_wait_event(cli->cl_mod_rpcs_waitq,
2293                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2294         } while (true);
2295 }
2296 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2297
2298 /* Put a modify RPC slot from the obd client @cli according
2299  * to the kind of operation @opc that has been sent and the
2300  * intent @it of the operation if it applies.
2301  */
2302 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2303                           struct lookup_intent *it, __u16 tag)
2304 {
2305         bool                    close_req = false;
2306
2307         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2308                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2309                 return;
2310
2311         if (opc == MDS_CLOSE)
2312                 close_req = true;
2313
2314         spin_lock(&cli->cl_mod_rpcs_lock);
2315         cli->cl_mod_rpcs_in_flight--;
2316         if (close_req)
2317                 cli->cl_close_rpcs_in_flight--;
2318         /* release the tag in the bitmap */
2319         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2320         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2321         spin_unlock(&cli->cl_mod_rpcs_lock);
2322         wake_up(&cli->cl_mod_rpcs_waitq);
2323 }
2324 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2325