Whamcloud - gitweb
LU-7042 lnet: Handle OFED 3.18 packaging definitions
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_disk.h>
48 #include <lustre_kernelcomm.h>
49
50 spinlock_t obd_types_lock;
51
52 static struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 static struct kmem_cache *import_cachep;
56
57 static struct list_head obd_zombie_imports;
58 static struct list_head obd_zombie_exports;
59 static spinlock_t  obd_zombie_impexp_lock;
60
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 struct list_head obd_stale_exports;
68 spinlock_t       obd_stale_export_lock;
69 atomic_t         obd_stale_export_num;
70
71 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
72 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
73
74 /*
75  * support functions: we could use inter-module communication, but this
76  * is more portable to other OS's
77  */
78 static struct obd_device *obd_device_alloc(void)
79 {
80         struct obd_device *obd;
81
82         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
83         if (obd != NULL) {
84                 obd->obd_magic = OBD_DEVICE_MAGIC;
85         }
86         return obd;
87 }
88
89 static void obd_device_free(struct obd_device *obd)
90 {
91         LASSERT(obd != NULL);
92         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
93                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
94         if (obd->obd_namespace != NULL) {
95                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
96                        obd, obd->obd_namespace, obd->obd_force);
97                 LBUG();
98         }
99         lu_ref_fini(&obd->obd_reference);
100         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
101 }
102
103 struct obd_type *class_search_type(const char *name)
104 {
105         struct list_head *tmp;
106         struct obd_type *type;
107
108         spin_lock(&obd_types_lock);
109         list_for_each(tmp, &obd_types) {
110                 type = list_entry(tmp, struct obd_type, typ_chain);
111                 if (strcmp(type->typ_name, name) == 0) {
112                         spin_unlock(&obd_types_lock);
113                         return type;
114                 }
115         }
116         spin_unlock(&obd_types_lock);
117         return NULL;
118 }
119 EXPORT_SYMBOL(class_search_type);
120
121 struct obd_type *class_get_type(const char *name)
122 {
123         struct obd_type *type = class_search_type(name);
124
125 #ifdef HAVE_MODULE_LOADING_SUPPORT
126         if (!type) {
127                 const char *modname = name;
128
129                 if (strcmp(modname, "obdfilter") == 0)
130                         modname = "ofd";
131
132                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
133                         modname = LUSTRE_OSP_NAME;
134
135                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
136                         modname = LUSTRE_MDT_NAME;
137
138                 if (!request_module("%s", modname)) {
139                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
140                         type = class_search_type(name);
141                 } else {
142                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
143                                            modname);
144                 }
145         }
146 #endif
147         if (type) {
148                 spin_lock(&type->obd_type_lock);
149                 type->typ_refcnt++;
150                 try_module_get(type->typ_dt_ops->o_owner);
151                 spin_unlock(&type->obd_type_lock);
152         }
153         return type;
154 }
155
156 void class_put_type(struct obd_type *type)
157 {
158         LASSERT(type);
159         spin_lock(&type->obd_type_lock);
160         type->typ_refcnt--;
161         module_put(type->typ_dt_ops->o_owner);
162         spin_unlock(&type->obd_type_lock);
163 }
164
165 #define CLASS_MAX_NAME 1024
166
167 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
168                         bool enable_proc, struct lprocfs_vars *vars,
169                         const char *name, struct lu_device_type *ldt)
170 {
171         struct obd_type *type;
172         int rc = 0;
173         ENTRY;
174
175         /* sanity check */
176         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
177
178         if (class_search_type(name)) {
179                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
180                 RETURN(-EEXIST);
181         }
182
183         rc = -ENOMEM;
184         OBD_ALLOC(type, sizeof(*type));
185         if (type == NULL)
186                 RETURN(rc);
187
188         OBD_ALLOC_PTR(type->typ_dt_ops);
189         OBD_ALLOC_PTR(type->typ_md_ops);
190         OBD_ALLOC(type->typ_name, strlen(name) + 1);
191
192         if (type->typ_dt_ops == NULL ||
193             type->typ_md_ops == NULL ||
194             type->typ_name == NULL)
195                 GOTO (failed, rc);
196
197         *(type->typ_dt_ops) = *dt_ops;
198         /* md_ops is optional */
199         if (md_ops)
200                 *(type->typ_md_ops) = *md_ops;
201         strcpy(type->typ_name, name);
202         spin_lock_init(&type->obd_type_lock);
203
204 #ifdef CONFIG_PROC_FS
205         if (enable_proc) {
206                 type->typ_procroot = lprocfs_register(type->typ_name,
207                                                       proc_lustre_root,
208                                                       vars, type);
209                 if (IS_ERR(type->typ_procroot)) {
210                         rc = PTR_ERR(type->typ_procroot);
211                         type->typ_procroot = NULL;
212                         GOTO(failed, rc);
213                 }
214         }
215 #endif
216         if (ldt != NULL) {
217                 type->typ_lu = ldt;
218                 rc = lu_device_type_init(ldt);
219                 if (rc != 0)
220                         GOTO (failed, rc);
221         }
222
223         spin_lock(&obd_types_lock);
224         list_add(&type->typ_chain, &obd_types);
225         spin_unlock(&obd_types_lock);
226
227         RETURN (0);
228
229 failed:
230         if (type->typ_name != NULL) {
231 #ifdef CONFIG_PROC_FS
232                 if (type->typ_procroot != NULL)
233                         remove_proc_subtree(type->typ_name, proc_lustre_root);
234 #endif
235                 OBD_FREE(type->typ_name, strlen(name) + 1);
236         }
237         if (type->typ_md_ops != NULL)
238                 OBD_FREE_PTR(type->typ_md_ops);
239         if (type->typ_dt_ops != NULL)
240                 OBD_FREE_PTR(type->typ_dt_ops);
241         OBD_FREE(type, sizeof(*type));
242         RETURN(rc);
243 }
244 EXPORT_SYMBOL(class_register_type);
245
246 int class_unregister_type(const char *name)
247 {
248         struct obd_type *type = class_search_type(name);
249         ENTRY;
250
251         if (!type) {
252                 CERROR("unknown obd type\n");
253                 RETURN(-EINVAL);
254         }
255
256         if (type->typ_refcnt) {
257                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
258                 /* This is a bad situation, let's make the best of it */
259                 /* Remove ops, but leave the name for debugging */
260                 OBD_FREE_PTR(type->typ_dt_ops);
261                 OBD_FREE_PTR(type->typ_md_ops);
262                 RETURN(-EBUSY);
263         }
264
265         /* we do not use type->typ_procroot as for compatibility purposes
266          * other modules can share names (i.e. lod can use lov entry). so
267          * we can't reference pointer as it can get invalided when another
268          * module removes the entry */
269 #ifdef CONFIG_PROC_FS
270         if (type->typ_procroot != NULL)
271                 remove_proc_subtree(type->typ_name, proc_lustre_root);
272         if (type->typ_procsym != NULL)
273                 lprocfs_remove(&type->typ_procsym);
274 #endif
275         if (type->typ_lu)
276                 lu_device_type_fini(type->typ_lu);
277
278         spin_lock(&obd_types_lock);
279         list_del(&type->typ_chain);
280         spin_unlock(&obd_types_lock);
281         OBD_FREE(type->typ_name, strlen(name) + 1);
282         if (type->typ_dt_ops != NULL)
283                 OBD_FREE_PTR(type->typ_dt_ops);
284         if (type->typ_md_ops != NULL)
285                 OBD_FREE_PTR(type->typ_md_ops);
286         OBD_FREE(type, sizeof(*type));
287         RETURN(0);
288 } /* class_unregister_type */
289 EXPORT_SYMBOL(class_unregister_type);
290
291 /**
292  * Create a new obd device.
293  *
294  * Find an empty slot in ::obd_devs[], create a new obd device in it.
295  *
296  * \param[in] type_name obd device type string.
297  * \param[in] name      obd device name.
298  *
299  * \retval NULL if create fails, otherwise return the obd device
300  *         pointer created.
301  */
302 struct obd_device *class_newdev(const char *type_name, const char *name)
303 {
304         struct obd_device *result = NULL;
305         struct obd_device *newdev;
306         struct obd_type *type = NULL;
307         int i;
308         int new_obd_minor = 0;
309         ENTRY;
310
311         if (strlen(name) >= MAX_OBD_NAME) {
312                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
313                 RETURN(ERR_PTR(-EINVAL));
314         }
315
316         type = class_get_type(type_name);
317         if (type == NULL){
318                 CERROR("OBD: unknown type: %s\n", type_name);
319                 RETURN(ERR_PTR(-ENODEV));
320         }
321
322         newdev = obd_device_alloc();
323         if (newdev == NULL)
324                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
325
326         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
327
328         write_lock(&obd_dev_lock);
329         for (i = 0; i < class_devno_max(); i++) {
330                 struct obd_device *obd = class_num2obd(i);
331
332                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
333                         CERROR("Device %s already exists at %d, won't add\n",
334                                name, i);
335                         if (result) {
336                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
337                                          "%p obd_magic %08x != %08x\n", result,
338                                          result->obd_magic, OBD_DEVICE_MAGIC);
339                                 LASSERTF(result->obd_minor == new_obd_minor,
340                                          "%p obd_minor %d != %d\n", result,
341                                          result->obd_minor, new_obd_minor);
342
343                                 obd_devs[result->obd_minor] = NULL;
344                                 result->obd_name[0]='\0';
345                          }
346                         result = ERR_PTR(-EEXIST);
347                         break;
348                 }
349                 if (!result && !obd) {
350                         result = newdev;
351                         result->obd_minor = i;
352                         new_obd_minor = i;
353                         result->obd_type = type;
354                         strncpy(result->obd_name, name,
355                                 sizeof(result->obd_name) - 1);
356                         obd_devs[i] = result;
357                 }
358         }
359         write_unlock(&obd_dev_lock);
360
361         if (result == NULL && i >= class_devno_max()) {
362                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
363                        class_devno_max());
364                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
365         }
366
367         if (IS_ERR(result))
368                 GOTO(out, result);
369
370         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
371                result->obd_name, result);
372
373         RETURN(result);
374 out:
375         obd_device_free(newdev);
376 out_type:
377         class_put_type(type);
378         return result;
379 }
380
381 void class_release_dev(struct obd_device *obd)
382 {
383         struct obd_type *obd_type = obd->obd_type;
384
385         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
386                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
387         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
388                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
389         LASSERT(obd_type != NULL);
390
391         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
392                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
393
394         write_lock(&obd_dev_lock);
395         obd_devs[obd->obd_minor] = NULL;
396         write_unlock(&obd_dev_lock);
397         obd_device_free(obd);
398
399         class_put_type(obd_type);
400 }
401
402 int class_name2dev(const char *name)
403 {
404         int i;
405
406         if (!name)
407                 return -1;
408
409         read_lock(&obd_dev_lock);
410         for (i = 0; i < class_devno_max(); i++) {
411                 struct obd_device *obd = class_num2obd(i);
412
413                 if (obd && strcmp(name, obd->obd_name) == 0) {
414                         /* Make sure we finished attaching before we give
415                            out any references */
416                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
417                         if (obd->obd_attached) {
418                                 read_unlock(&obd_dev_lock);
419                                 return i;
420                         }
421                         break;
422                 }
423         }
424         read_unlock(&obd_dev_lock);
425
426         return -1;
427 }
428
429 struct obd_device *class_name2obd(const char *name)
430 {
431         int dev = class_name2dev(name);
432
433         if (dev < 0 || dev > class_devno_max())
434                 return NULL;
435         return class_num2obd(dev);
436 }
437 EXPORT_SYMBOL(class_name2obd);
438
439 int class_uuid2dev(struct obd_uuid *uuid)
440 {
441         int i;
442
443         read_lock(&obd_dev_lock);
444         for (i = 0; i < class_devno_max(); i++) {
445                 struct obd_device *obd = class_num2obd(i);
446
447                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
448                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
449                         read_unlock(&obd_dev_lock);
450                         return i;
451                 }
452         }
453         read_unlock(&obd_dev_lock);
454
455         return -1;
456 }
457
458 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
459 {
460         int dev = class_uuid2dev(uuid);
461         if (dev < 0)
462                 return NULL;
463         return class_num2obd(dev);
464 }
465 EXPORT_SYMBOL(class_uuid2obd);
466
467 /**
468  * Get obd device from ::obd_devs[]
469  *
470  * \param num [in] array index
471  *
472  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
473  *         otherwise return the obd device there.
474  */
475 struct obd_device *class_num2obd(int num)
476 {
477         struct obd_device *obd = NULL;
478
479         if (num < class_devno_max()) {
480                 obd = obd_devs[num];
481                 if (obd == NULL)
482                         return NULL;
483
484                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
485                          "%p obd_magic %08x != %08x\n",
486                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
487                 LASSERTF(obd->obd_minor == num,
488                          "%p obd_minor %0d != %0d\n",
489                          obd, obd->obd_minor, num);
490         }
491
492         return obd;
493 }
494
495 /**
496  * Get obd devices count. Device in any
497  *    state are counted
498  * \retval obd device count
499  */
500 int get_devices_count(void)
501 {
502         int index, max_index = class_devno_max(), dev_count = 0;
503
504         read_lock(&obd_dev_lock);
505         for (index = 0; index <= max_index; index++) {
506                 struct obd_device *obd = class_num2obd(index);
507                 if (obd != NULL)
508                         dev_count++;
509         }
510         read_unlock(&obd_dev_lock);
511
512         return dev_count;
513 }
514 EXPORT_SYMBOL(get_devices_count);
515
516 void class_obd_list(void)
517 {
518         char *status;
519         int i;
520
521         read_lock(&obd_dev_lock);
522         for (i = 0; i < class_devno_max(); i++) {
523                 struct obd_device *obd = class_num2obd(i);
524
525                 if (obd == NULL)
526                         continue;
527                 if (obd->obd_stopping)
528                         status = "ST";
529                 else if (obd->obd_set_up)
530                         status = "UP";
531                 else if (obd->obd_attached)
532                         status = "AT";
533                 else
534                         status = "--";
535                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
536                          i, status, obd->obd_type->typ_name,
537                          obd->obd_name, obd->obd_uuid.uuid,
538                          atomic_read(&obd->obd_refcount));
539         }
540         read_unlock(&obd_dev_lock);
541         return;
542 }
543
544 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
545    specified, then only the client with that uuid is returned,
546    otherwise any client connected to the tgt is returned. */
547 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
548                                           const char * typ_name,
549                                           struct obd_uuid *grp_uuid)
550 {
551         int i;
552
553         read_lock(&obd_dev_lock);
554         for (i = 0; i < class_devno_max(); i++) {
555                 struct obd_device *obd = class_num2obd(i);
556
557                 if (obd == NULL)
558                         continue;
559                 if ((strncmp(obd->obd_type->typ_name, typ_name,
560                              strlen(typ_name)) == 0)) {
561                         if (obd_uuid_equals(tgt_uuid,
562                                             &obd->u.cli.cl_target_uuid) &&
563                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
564                                                          &obd->obd_uuid) : 1)) {
565                                 read_unlock(&obd_dev_lock);
566                                 return obd;
567                         }
568                 }
569         }
570         read_unlock(&obd_dev_lock);
571
572         return NULL;
573 }
574 EXPORT_SYMBOL(class_find_client_obd);
575
576 /* Iterate the obd_device list looking devices have grp_uuid. Start
577    searching at *next, and if a device is found, the next index to look
578    at is saved in *next. If next is NULL, then the first matching device
579    will always be returned. */
580 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
581 {
582         int i;
583
584         if (next == NULL)
585                 i = 0;
586         else if (*next >= 0 && *next < class_devno_max())
587                 i = *next;
588         else
589                 return NULL;
590
591         read_lock(&obd_dev_lock);
592         for (; i < class_devno_max(); i++) {
593                 struct obd_device *obd = class_num2obd(i);
594
595                 if (obd == NULL)
596                         continue;
597                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
598                         if (next != NULL)
599                                 *next = i+1;
600                         read_unlock(&obd_dev_lock);
601                         return obd;
602                 }
603         }
604         read_unlock(&obd_dev_lock);
605
606         return NULL;
607 }
608 EXPORT_SYMBOL(class_devices_in_group);
609
610 /**
611  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
612  * adjust sptlrpc settings accordingly.
613  */
614 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
615 {
616         struct obd_device  *obd;
617         const char         *type;
618         int                 i, rc = 0, rc2;
619
620         LASSERT(namelen > 0);
621
622         read_lock(&obd_dev_lock);
623         for (i = 0; i < class_devno_max(); i++) {
624                 obd = class_num2obd(i);
625
626                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
627                         continue;
628
629                 /* only notify mdc, osc, osp, lwp, mdt, ost
630                  * because only these have a -sptlrpc llog */
631                 type = obd->obd_type->typ_name;
632                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
633                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
634                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
635                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
636                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
637                     strcmp(type, LUSTRE_OST_NAME) != 0)
638                         continue;
639
640                 if (strncmp(obd->obd_name, fsname, namelen))
641                         continue;
642
643                 class_incref(obd, __FUNCTION__, obd);
644                 read_unlock(&obd_dev_lock);
645                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
646                                          sizeof(KEY_SPTLRPC_CONF),
647                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
648                 rc = rc ? rc : rc2;
649                 class_decref(obd, __FUNCTION__, obd);
650                 read_lock(&obd_dev_lock);
651         }
652         read_unlock(&obd_dev_lock);
653         return rc;
654 }
655 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
656
657 void obd_cleanup_caches(void)
658 {
659         ENTRY;
660         if (obd_device_cachep) {
661                 kmem_cache_destroy(obd_device_cachep);
662                 obd_device_cachep = NULL;
663         }
664         if (obdo_cachep) {
665                 kmem_cache_destroy(obdo_cachep);
666                 obdo_cachep = NULL;
667         }
668         if (import_cachep) {
669                 kmem_cache_destroy(import_cachep);
670                 import_cachep = NULL;
671         }
672
673         EXIT;
674 }
675
676 int obd_init_caches(void)
677 {
678         int rc;
679         ENTRY;
680
681         LASSERT(obd_device_cachep == NULL);
682         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
683                                               sizeof(struct obd_device),
684                                               0, 0, NULL);
685         if (!obd_device_cachep)
686                 GOTO(out, rc = -ENOMEM);
687
688         LASSERT(obdo_cachep == NULL);
689         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
690                                         0, 0, NULL);
691         if (!obdo_cachep)
692                 GOTO(out, rc = -ENOMEM);
693
694         LASSERT(import_cachep == NULL);
695         import_cachep = kmem_cache_create("ll_import_cache",
696                                           sizeof(struct obd_import),
697                                           0, 0, NULL);
698         if (!import_cachep)
699                 GOTO(out, rc = -ENOMEM);
700
701         RETURN(0);
702 out:
703         obd_cleanup_caches();
704         RETURN(rc);
705 }
706
707 /* map connection to client */
708 struct obd_export *class_conn2export(struct lustre_handle *conn)
709 {
710         struct obd_export *export;
711         ENTRY;
712
713         if (!conn) {
714                 CDEBUG(D_CACHE, "looking for null handle\n");
715                 RETURN(NULL);
716         }
717
718         if (conn->cookie == -1) {  /* this means assign a new connection */
719                 CDEBUG(D_CACHE, "want a new connection\n");
720                 RETURN(NULL);
721         }
722
723         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
724         export = class_handle2object(conn->cookie, NULL);
725         RETURN(export);
726 }
727 EXPORT_SYMBOL(class_conn2export);
728
729 struct obd_device *class_exp2obd(struct obd_export *exp)
730 {
731         if (exp)
732                 return exp->exp_obd;
733         return NULL;
734 }
735 EXPORT_SYMBOL(class_exp2obd);
736
737 struct obd_device *class_conn2obd(struct lustre_handle *conn)
738 {
739         struct obd_export *export;
740         export = class_conn2export(conn);
741         if (export) {
742                 struct obd_device *obd = export->exp_obd;
743                 class_export_put(export);
744                 return obd;
745         }
746         return NULL;
747 }
748
749 struct obd_import *class_exp2cliimp(struct obd_export *exp)
750 {
751         struct obd_device *obd = exp->exp_obd;
752         if (obd == NULL)
753                 return NULL;
754         return obd->u.cli.cl_import;
755 }
756 EXPORT_SYMBOL(class_exp2cliimp);
757
758 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
759 {
760         struct obd_device *obd = class_conn2obd(conn);
761         if (obd == NULL)
762                 return NULL;
763         return obd->u.cli.cl_import;
764 }
765
766 /* Export management functions */
767 static void class_export_destroy(struct obd_export *exp)
768 {
769         struct obd_device *obd = exp->exp_obd;
770         ENTRY;
771
772         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
773         LASSERT(obd != NULL);
774
775         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
776                exp->exp_client_uuid.uuid, obd->obd_name);
777
778         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
779         if (exp->exp_connection)
780                 ptlrpc_put_connection_superhack(exp->exp_connection);
781
782         LASSERT(list_empty(&exp->exp_outstanding_replies));
783         LASSERT(list_empty(&exp->exp_uncommitted_replies));
784         LASSERT(list_empty(&exp->exp_req_replay_queue));
785         LASSERT(list_empty(&exp->exp_hp_rpcs));
786         obd_destroy_export(exp);
787         class_decref(obd, "export", exp);
788
789         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
790         EXIT;
791 }
792
793 static void export_handle_addref(void *export)
794 {
795         class_export_get(export);
796 }
797
798 static struct portals_handle_ops export_handle_ops = {
799         .hop_addref = export_handle_addref,
800         .hop_free   = NULL,
801 };
802
803 struct obd_export *class_export_get(struct obd_export *exp)
804 {
805         atomic_inc(&exp->exp_refcount);
806         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
807                atomic_read(&exp->exp_refcount));
808         return exp;
809 }
810 EXPORT_SYMBOL(class_export_get);
811
812 void class_export_put(struct obd_export *exp)
813 {
814         LASSERT(exp != NULL);
815         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
816         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
817                atomic_read(&exp->exp_refcount) - 1);
818
819         if (atomic_dec_and_test(&exp->exp_refcount)) {
820                 LASSERT(!list_empty(&exp->exp_obd_chain));
821                 LASSERT(list_empty(&exp->exp_stale_list));
822                 CDEBUG(D_IOCTL, "final put %p/%s\n",
823                        exp, exp->exp_client_uuid.uuid);
824
825                 /* release nid stat refererence */
826                 lprocfs_exp_cleanup(exp);
827
828                 obd_zombie_export_add(exp);
829         }
830 }
831 EXPORT_SYMBOL(class_export_put);
832
833 /* Creates a new export, adds it to the hash table, and returns a
834  * pointer to it. The refcount is 2: one for the hash reference, and
835  * one for the pointer returned by this function. */
836 struct obd_export *class_new_export(struct obd_device *obd,
837                                     struct obd_uuid *cluuid)
838 {
839         struct obd_export *export;
840         struct cfs_hash *hash = NULL;
841         int rc = 0;
842         ENTRY;
843
844         OBD_ALLOC_PTR(export);
845         if (!export)
846                 return ERR_PTR(-ENOMEM);
847
848         export->exp_conn_cnt = 0;
849         export->exp_lock_hash = NULL;
850         export->exp_flock_hash = NULL;
851         atomic_set(&export->exp_refcount, 2);
852         atomic_set(&export->exp_rpc_count, 0);
853         atomic_set(&export->exp_cb_count, 0);
854         atomic_set(&export->exp_locks_count, 0);
855 #if LUSTRE_TRACKS_LOCK_EXP_REFS
856         INIT_LIST_HEAD(&export->exp_locks_list);
857         spin_lock_init(&export->exp_locks_list_guard);
858 #endif
859         atomic_set(&export->exp_replay_count, 0);
860         export->exp_obd = obd;
861         INIT_LIST_HEAD(&export->exp_outstanding_replies);
862         spin_lock_init(&export->exp_uncommitted_replies_lock);
863         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
864         INIT_LIST_HEAD(&export->exp_req_replay_queue);
865         INIT_LIST_HEAD(&export->exp_handle.h_link);
866         INIT_LIST_HEAD(&export->exp_hp_rpcs);
867         INIT_LIST_HEAD(&export->exp_reg_rpcs);
868         class_handle_hash(&export->exp_handle, &export_handle_ops);
869         export->exp_last_request_time = cfs_time_current_sec();
870         spin_lock_init(&export->exp_lock);
871         spin_lock_init(&export->exp_rpc_lock);
872         INIT_HLIST_NODE(&export->exp_uuid_hash);
873         INIT_HLIST_NODE(&export->exp_nid_hash);
874         INIT_HLIST_NODE(&export->exp_gen_hash);
875         spin_lock_init(&export->exp_bl_list_lock);
876         INIT_LIST_HEAD(&export->exp_bl_list);
877         INIT_LIST_HEAD(&export->exp_stale_list);
878
879         export->exp_sp_peer = LUSTRE_SP_ANY;
880         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
881         export->exp_client_uuid = *cluuid;
882         obd_init_export(export);
883
884         spin_lock(&obd->obd_dev_lock);
885         /* shouldn't happen, but might race */
886         if (obd->obd_stopping)
887                 GOTO(exit_unlock, rc = -ENODEV);
888
889         hash = cfs_hash_getref(obd->obd_uuid_hash);
890         if (hash == NULL)
891                 GOTO(exit_unlock, rc = -ENODEV);
892         spin_unlock(&obd->obd_dev_lock);
893
894         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
895                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
896                 if (rc != 0) {
897                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
898                                       obd->obd_name, cluuid->uuid, rc);
899                         GOTO(exit_err, rc = -EALREADY);
900                 }
901         }
902
903         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
904         spin_lock(&obd->obd_dev_lock);
905         if (obd->obd_stopping) {
906                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
907                 GOTO(exit_unlock, rc = -ENODEV);
908         }
909
910         class_incref(obd, "export", export);
911         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
912         list_add_tail(&export->exp_obd_chain_timed,
913                       &export->exp_obd->obd_exports_timed);
914         export->exp_obd->obd_num_exports++;
915         spin_unlock(&obd->obd_dev_lock);
916         cfs_hash_putref(hash);
917         RETURN(export);
918
919 exit_unlock:
920         spin_unlock(&obd->obd_dev_lock);
921 exit_err:
922         if (hash)
923                 cfs_hash_putref(hash);
924         class_handle_unhash(&export->exp_handle);
925         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
926         obd_destroy_export(export);
927         OBD_FREE_PTR(export);
928         return ERR_PTR(rc);
929 }
930 EXPORT_SYMBOL(class_new_export);
931
932 void class_unlink_export(struct obd_export *exp)
933 {
934         class_handle_unhash(&exp->exp_handle);
935
936         spin_lock(&exp->exp_obd->obd_dev_lock);
937         /* delete an uuid-export hashitem from hashtables */
938         if (!hlist_unhashed(&exp->exp_uuid_hash))
939                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
940                              &exp->exp_client_uuid,
941                              &exp->exp_uuid_hash);
942
943         if (!hlist_unhashed(&exp->exp_gen_hash)) {
944                 struct tg_export_data   *ted = &exp->exp_target_data;
945                 struct cfs_hash         *hash;
946
947                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
948                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
949                              &exp->exp_gen_hash);
950                 cfs_hash_putref(hash);
951         }
952
953         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
954         list_del_init(&exp->exp_obd_chain_timed);
955         exp->exp_obd->obd_num_exports--;
956         spin_unlock(&exp->exp_obd->obd_dev_lock);
957         atomic_inc(&obd_stale_export_num);
958
959         /* A reference is kept by obd_stale_exports list */
960         obd_stale_export_put(exp);
961 }
962
963 /* Import management functions */
964 static void class_import_destroy(struct obd_import *imp)
965 {
966         ENTRY;
967
968         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
969                 imp->imp_obd->obd_name);
970
971         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
972
973         ptlrpc_put_connection_superhack(imp->imp_connection);
974
975         while (!list_empty(&imp->imp_conn_list)) {
976                 struct obd_import_conn *imp_conn;
977
978                 imp_conn = list_entry(imp->imp_conn_list.next,
979                                       struct obd_import_conn, oic_item);
980                 list_del_init(&imp_conn->oic_item);
981                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
982                 OBD_FREE(imp_conn, sizeof(*imp_conn));
983         }
984
985         LASSERT(imp->imp_sec == NULL);
986         class_decref(imp->imp_obd, "import", imp);
987         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
988         EXIT;
989 }
990
991 static void import_handle_addref(void *import)
992 {
993         class_import_get(import);
994 }
995
996 static struct portals_handle_ops import_handle_ops = {
997         .hop_addref = import_handle_addref,
998         .hop_free   = NULL,
999 };
1000
1001 struct obd_import *class_import_get(struct obd_import *import)
1002 {
1003         atomic_inc(&import->imp_refcount);
1004         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1005                atomic_read(&import->imp_refcount),
1006                import->imp_obd->obd_name);
1007         return import;
1008 }
1009 EXPORT_SYMBOL(class_import_get);
1010
1011 void class_import_put(struct obd_import *imp)
1012 {
1013         ENTRY;
1014
1015         LASSERT(list_empty(&imp->imp_zombie_chain));
1016         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1017
1018         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1019                atomic_read(&imp->imp_refcount) - 1,
1020                imp->imp_obd->obd_name);
1021
1022         if (atomic_dec_and_test(&imp->imp_refcount)) {
1023                 CDEBUG(D_INFO, "final put import %p\n", imp);
1024                 obd_zombie_import_add(imp);
1025         }
1026
1027         /* catch possible import put race */
1028         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1029         EXIT;
1030 }
1031 EXPORT_SYMBOL(class_import_put);
1032
1033 static void init_imp_at(struct imp_at *at) {
1034         int i;
1035         at_init(&at->iat_net_latency, 0, 0);
1036         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1037                 /* max service estimates are tracked on the server side, so
1038                    don't use the AT history here, just use the last reported
1039                    val. (But keep hist for proc histogram, worst_ever) */
1040                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1041                         AT_FLG_NOHIST);
1042         }
1043 }
1044
1045 struct obd_import *class_new_import(struct obd_device *obd)
1046 {
1047         struct obd_import *imp;
1048
1049         OBD_ALLOC(imp, sizeof(*imp));
1050         if (imp == NULL)
1051                 return NULL;
1052
1053         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1054         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1055         INIT_LIST_HEAD(&imp->imp_replay_list);
1056         INIT_LIST_HEAD(&imp->imp_sending_list);
1057         INIT_LIST_HEAD(&imp->imp_delayed_list);
1058         INIT_LIST_HEAD(&imp->imp_committed_list);
1059         imp->imp_replay_cursor = &imp->imp_committed_list;
1060         spin_lock_init(&imp->imp_lock);
1061         imp->imp_last_success_conn = 0;
1062         imp->imp_state = LUSTRE_IMP_NEW;
1063         imp->imp_obd = class_incref(obd, "import", imp);
1064         mutex_init(&imp->imp_sec_mutex);
1065         init_waitqueue_head(&imp->imp_recovery_waitq);
1066
1067         atomic_set(&imp->imp_refcount, 2);
1068         atomic_set(&imp->imp_unregistering, 0);
1069         atomic_set(&imp->imp_inflight, 0);
1070         atomic_set(&imp->imp_replay_inflight, 0);
1071         atomic_set(&imp->imp_inval_count, 0);
1072         INIT_LIST_HEAD(&imp->imp_conn_list);
1073         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1074         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1075         init_imp_at(&imp->imp_at);
1076
1077         /* the default magic is V2, will be used in connect RPC, and
1078          * then adjusted according to the flags in request/reply. */
1079         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1080
1081         return imp;
1082 }
1083 EXPORT_SYMBOL(class_new_import);
1084
1085 void class_destroy_import(struct obd_import *import)
1086 {
1087         LASSERT(import != NULL);
1088         LASSERT(import != LP_POISON);
1089
1090         class_handle_unhash(&import->imp_handle);
1091
1092         spin_lock(&import->imp_lock);
1093         import->imp_generation++;
1094         spin_unlock(&import->imp_lock);
1095         class_import_put(import);
1096 }
1097 EXPORT_SYMBOL(class_destroy_import);
1098
1099 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1100
1101 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1102 {
1103         spin_lock(&exp->exp_locks_list_guard);
1104
1105         LASSERT(lock->l_exp_refs_nr >= 0);
1106
1107         if (lock->l_exp_refs_target != NULL &&
1108             lock->l_exp_refs_target != exp) {
1109                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1110                               exp, lock, lock->l_exp_refs_target);
1111         }
1112         if ((lock->l_exp_refs_nr ++) == 0) {
1113                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1114                 lock->l_exp_refs_target = exp;
1115         }
1116         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1117                lock, exp, lock->l_exp_refs_nr);
1118         spin_unlock(&exp->exp_locks_list_guard);
1119 }
1120
1121 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1122 {
1123         spin_lock(&exp->exp_locks_list_guard);
1124         LASSERT(lock->l_exp_refs_nr > 0);
1125         if (lock->l_exp_refs_target != exp) {
1126                 LCONSOLE_WARN("lock %p, "
1127                               "mismatching export pointers: %p, %p\n",
1128                               lock, lock->l_exp_refs_target, exp);
1129         }
1130         if (-- lock->l_exp_refs_nr == 0) {
1131                 list_del_init(&lock->l_exp_refs_link);
1132                 lock->l_exp_refs_target = NULL;
1133         }
1134         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1135                lock, exp, lock->l_exp_refs_nr);
1136         spin_unlock(&exp->exp_locks_list_guard);
1137 }
1138 #endif
1139
1140 /* A connection defines an export context in which preallocation can
1141    be managed. This releases the export pointer reference, and returns
1142    the export handle, so the export refcount is 1 when this function
1143    returns. */
1144 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1145                   struct obd_uuid *cluuid)
1146 {
1147         struct obd_export *export;
1148         LASSERT(conn != NULL);
1149         LASSERT(obd != NULL);
1150         LASSERT(cluuid != NULL);
1151         ENTRY;
1152
1153         export = class_new_export(obd, cluuid);
1154         if (IS_ERR(export))
1155                 RETURN(PTR_ERR(export));
1156
1157         conn->cookie = export->exp_handle.h_cookie;
1158         class_export_put(export);
1159
1160         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1161                cluuid->uuid, conn->cookie);
1162         RETURN(0);
1163 }
1164 EXPORT_SYMBOL(class_connect);
1165
1166 /* if export is involved in recovery then clean up related things */
1167 static void class_export_recovery_cleanup(struct obd_export *exp)
1168 {
1169         struct obd_device *obd = exp->exp_obd;
1170
1171         spin_lock(&obd->obd_recovery_task_lock);
1172         if (obd->obd_recovering) {
1173                 if (exp->exp_in_recovery) {
1174                         spin_lock(&exp->exp_lock);
1175                         exp->exp_in_recovery = 0;
1176                         spin_unlock(&exp->exp_lock);
1177                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1178                         atomic_dec(&obd->obd_connected_clients);
1179                 }
1180
1181                 /* if called during recovery then should update
1182                  * obd_stale_clients counter,
1183                  * lightweight exports are not counted */
1184                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1185                         exp->exp_obd->obd_stale_clients++;
1186         }
1187         spin_unlock(&obd->obd_recovery_task_lock);
1188
1189         spin_lock(&exp->exp_lock);
1190         /** Cleanup req replay fields */
1191         if (exp->exp_req_replay_needed) {
1192                 exp->exp_req_replay_needed = 0;
1193
1194                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1195                 atomic_dec(&obd->obd_req_replay_clients);
1196         }
1197
1198         /** Cleanup lock replay data */
1199         if (exp->exp_lock_replay_needed) {
1200                 exp->exp_lock_replay_needed = 0;
1201
1202                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1203                 atomic_dec(&obd->obd_lock_replay_clients);
1204         }
1205         spin_unlock(&exp->exp_lock);
1206 }
1207
1208 /* This function removes 1-3 references from the export:
1209  * 1 - for export pointer passed
1210  * and if disconnect really need
1211  * 2 - removing from hash
1212  * 3 - in client_unlink_export
1213  * The export pointer passed to this function can destroyed */
1214 int class_disconnect(struct obd_export *export)
1215 {
1216         int already_disconnected;
1217         ENTRY;
1218
1219         if (export == NULL) {
1220                 CWARN("attempting to free NULL export %p\n", export);
1221                 RETURN(-EINVAL);
1222         }
1223
1224         spin_lock(&export->exp_lock);
1225         already_disconnected = export->exp_disconnected;
1226         export->exp_disconnected = 1;
1227         spin_unlock(&export->exp_lock);
1228
1229         /* class_cleanup(), abort_recovery(), and class_fail_export()
1230          * all end up in here, and if any of them race we shouldn't
1231          * call extra class_export_puts(). */
1232         if (already_disconnected) {
1233                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1234                 GOTO(no_disconn, already_disconnected);
1235         }
1236
1237         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1238                export->exp_handle.h_cookie);
1239
1240         if (!hlist_unhashed(&export->exp_nid_hash))
1241                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1242                              &export->exp_connection->c_peer.nid,
1243                              &export->exp_nid_hash);
1244
1245         class_export_recovery_cleanup(export);
1246         class_unlink_export(export);
1247 no_disconn:
1248         class_export_put(export);
1249         RETURN(0);
1250 }
1251 EXPORT_SYMBOL(class_disconnect);
1252
1253 /* Return non-zero for a fully connected export */
1254 int class_connected_export(struct obd_export *exp)
1255 {
1256         int connected = 0;
1257
1258         if (exp) {
1259                 spin_lock(&exp->exp_lock);
1260                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1261                 spin_unlock(&exp->exp_lock);
1262         }
1263         return connected;
1264 }
1265 EXPORT_SYMBOL(class_connected_export);
1266
1267 static void class_disconnect_export_list(struct list_head *list,
1268                                          enum obd_option flags)
1269 {
1270         int rc;
1271         struct obd_export *exp;
1272         ENTRY;
1273
1274         /* It's possible that an export may disconnect itself, but
1275          * nothing else will be added to this list. */
1276         while (!list_empty(list)) {
1277                 exp = list_entry(list->next, struct obd_export,
1278                                  exp_obd_chain);
1279                 /* need for safe call CDEBUG after obd_disconnect */
1280                 class_export_get(exp);
1281
1282                 spin_lock(&exp->exp_lock);
1283                 exp->exp_flags = flags;
1284                 spin_unlock(&exp->exp_lock);
1285
1286                 if (obd_uuid_equals(&exp->exp_client_uuid,
1287                                     &exp->exp_obd->obd_uuid)) {
1288                         CDEBUG(D_HA,
1289                                "exp %p export uuid == obd uuid, don't discon\n",
1290                                exp);
1291                         /* Need to delete this now so we don't end up pointing
1292                          * to work_list later when this export is cleaned up. */
1293                         list_del_init(&exp->exp_obd_chain);
1294                         class_export_put(exp);
1295                         continue;
1296                 }
1297
1298                 class_export_get(exp);
1299                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1300                        "last request at "CFS_TIME_T"\n",
1301                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1302                        exp, exp->exp_last_request_time);
1303                 /* release one export reference anyway */
1304                 rc = obd_disconnect(exp);
1305
1306                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1307                        obd_export_nid2str(exp), exp, rc);
1308                 class_export_put(exp);
1309         }
1310         EXIT;
1311 }
1312
1313 void class_disconnect_exports(struct obd_device *obd)
1314 {
1315         struct list_head work_list;
1316         ENTRY;
1317
1318         /* Move all of the exports from obd_exports to a work list, en masse. */
1319         INIT_LIST_HEAD(&work_list);
1320         spin_lock(&obd->obd_dev_lock);
1321         list_splice_init(&obd->obd_exports, &work_list);
1322         list_splice_init(&obd->obd_delayed_exports, &work_list);
1323         spin_unlock(&obd->obd_dev_lock);
1324
1325         if (!list_empty(&work_list)) {
1326                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1327                        "disconnecting them\n", obd->obd_minor, obd);
1328                 class_disconnect_export_list(&work_list,
1329                                              exp_flags_from_obd(obd));
1330         } else
1331                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1332                        obd->obd_minor, obd);
1333         EXIT;
1334 }
1335 EXPORT_SYMBOL(class_disconnect_exports);
1336
1337 /* Remove exports that have not completed recovery.
1338  */
1339 void class_disconnect_stale_exports(struct obd_device *obd,
1340                                     int (*test_export)(struct obd_export *))
1341 {
1342         struct list_head work_list;
1343         struct obd_export *exp, *n;
1344         int evicted = 0;
1345         ENTRY;
1346
1347         INIT_LIST_HEAD(&work_list);
1348         spin_lock(&obd->obd_dev_lock);
1349         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1350                                  exp_obd_chain) {
1351                 /* don't count self-export as client */
1352                 if (obd_uuid_equals(&exp->exp_client_uuid,
1353                                     &exp->exp_obd->obd_uuid))
1354                         continue;
1355
1356                 /* don't evict clients which have no slot in last_rcvd
1357                  * (e.g. lightweight connection) */
1358                 if (exp->exp_target_data.ted_lr_idx == -1)
1359                         continue;
1360
1361                 spin_lock(&exp->exp_lock);
1362                 if (exp->exp_failed || test_export(exp)) {
1363                         spin_unlock(&exp->exp_lock);
1364                         continue;
1365                 }
1366                 exp->exp_failed = 1;
1367                 spin_unlock(&exp->exp_lock);
1368
1369                 list_move(&exp->exp_obd_chain, &work_list);
1370                 evicted++;
1371                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1372                        obd->obd_name, exp->exp_client_uuid.uuid,
1373                        exp->exp_connection == NULL ? "<unknown>" :
1374                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1375                 print_export_data(exp, "EVICTING", 0);
1376         }
1377         spin_unlock(&obd->obd_dev_lock);
1378
1379         if (evicted)
1380                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1381                               obd->obd_name, evicted);
1382
1383         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1384                                                  OBD_OPT_ABORT_RECOV);
1385         EXIT;
1386 }
1387 EXPORT_SYMBOL(class_disconnect_stale_exports);
1388
1389 void class_fail_export(struct obd_export *exp)
1390 {
1391         int rc, already_failed;
1392
1393         spin_lock(&exp->exp_lock);
1394         already_failed = exp->exp_failed;
1395         exp->exp_failed = 1;
1396         spin_unlock(&exp->exp_lock);
1397
1398         if (already_failed) {
1399                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1400                        exp, exp->exp_client_uuid.uuid);
1401                 return;
1402         }
1403
1404         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1405                exp, exp->exp_client_uuid.uuid);
1406
1407         if (obd_dump_on_timeout)
1408                 libcfs_debug_dumplog();
1409
1410         /* need for safe call CDEBUG after obd_disconnect */
1411         class_export_get(exp);
1412
1413         /* Most callers into obd_disconnect are removing their own reference
1414          * (request, for example) in addition to the one from the hash table.
1415          * We don't have such a reference here, so make one. */
1416         class_export_get(exp);
1417         rc = obd_disconnect(exp);
1418         if (rc)
1419                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1420         else
1421                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1422                        exp, exp->exp_client_uuid.uuid);
1423         class_export_put(exp);
1424 }
1425 EXPORT_SYMBOL(class_fail_export);
1426
1427 char *obd_export_nid2str(struct obd_export *exp)
1428 {
1429         if (exp->exp_connection != NULL)
1430                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1431
1432         return "(no nid)";
1433 }
1434 EXPORT_SYMBOL(obd_export_nid2str);
1435
1436 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1437 {
1438         struct cfs_hash *nid_hash;
1439         struct obd_export *doomed_exp = NULL;
1440         int exports_evicted = 0;
1441
1442         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1443
1444         spin_lock(&obd->obd_dev_lock);
1445         /* umount has run already, so evict thread should leave
1446          * its task to umount thread now */
1447         if (obd->obd_stopping) {
1448                 spin_unlock(&obd->obd_dev_lock);
1449                 return exports_evicted;
1450         }
1451         nid_hash = obd->obd_nid_hash;
1452         cfs_hash_getref(nid_hash);
1453         spin_unlock(&obd->obd_dev_lock);
1454
1455         do {
1456                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1457                 if (doomed_exp == NULL)
1458                         break;
1459
1460                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1461                          "nid %s found, wanted nid %s, requested nid %s\n",
1462                          obd_export_nid2str(doomed_exp),
1463                          libcfs_nid2str(nid_key), nid);
1464                 LASSERTF(doomed_exp != obd->obd_self_export,
1465                          "self-export is hashed by NID?\n");
1466                 exports_evicted++;
1467                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1468                               "request\n", obd->obd_name,
1469                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1470                               obd_export_nid2str(doomed_exp));
1471                 class_fail_export(doomed_exp);
1472                 class_export_put(doomed_exp);
1473         } while (1);
1474
1475         cfs_hash_putref(nid_hash);
1476
1477         if (!exports_evicted)
1478                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1479                        obd->obd_name, nid);
1480         return exports_evicted;
1481 }
1482 EXPORT_SYMBOL(obd_export_evict_by_nid);
1483
1484 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1485 {
1486         struct cfs_hash *uuid_hash;
1487         struct obd_export *doomed_exp = NULL;
1488         struct obd_uuid doomed_uuid;
1489         int exports_evicted = 0;
1490
1491         spin_lock(&obd->obd_dev_lock);
1492         if (obd->obd_stopping) {
1493                 spin_unlock(&obd->obd_dev_lock);
1494                 return exports_evicted;
1495         }
1496         uuid_hash = obd->obd_uuid_hash;
1497         cfs_hash_getref(uuid_hash);
1498         spin_unlock(&obd->obd_dev_lock);
1499
1500         obd_str2uuid(&doomed_uuid, uuid);
1501         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1502                 CERROR("%s: can't evict myself\n", obd->obd_name);
1503                 cfs_hash_putref(uuid_hash);
1504                 return exports_evicted;
1505         }
1506
1507         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1508
1509         if (doomed_exp == NULL) {
1510                 CERROR("%s: can't disconnect %s: no exports found\n",
1511                        obd->obd_name, uuid);
1512         } else {
1513                 CWARN("%s: evicting %s at adminstrative request\n",
1514                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1515                 class_fail_export(doomed_exp);
1516                 class_export_put(doomed_exp);
1517                 exports_evicted++;
1518         }
1519         cfs_hash_putref(uuid_hash);
1520
1521         return exports_evicted;
1522 }
1523
1524 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1525 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1526 #endif
1527
1528 static void print_export_data(struct obd_export *exp, const char *status,
1529                               int locks)
1530 {
1531         struct ptlrpc_reply_state *rs;
1532         struct ptlrpc_reply_state *first_reply = NULL;
1533         int nreplies = 0;
1534
1535         spin_lock(&exp->exp_lock);
1536         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1537                             rs_exp_list) {
1538                 if (nreplies == 0)
1539                         first_reply = rs;
1540                 nreplies++;
1541         }
1542         spin_unlock(&exp->exp_lock);
1543
1544         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1545                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1546                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1547                atomic_read(&exp->exp_rpc_count),
1548                atomic_read(&exp->exp_cb_count),
1549                atomic_read(&exp->exp_locks_count),
1550                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1551                nreplies, first_reply, nreplies > 3 ? "..." : "",
1552                exp->exp_last_committed);
1553 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1554         if (locks && class_export_dump_hook != NULL)
1555                 class_export_dump_hook(exp);
1556 #endif
1557 }
1558
1559 void dump_exports(struct obd_device *obd, int locks)
1560 {
1561         struct obd_export *exp;
1562
1563         spin_lock(&obd->obd_dev_lock);
1564         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1565                 print_export_data(exp, "ACTIVE", locks);
1566         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1567                 print_export_data(exp, "UNLINKED", locks);
1568         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1569                 print_export_data(exp, "DELAYED", locks);
1570         spin_unlock(&obd->obd_dev_lock);
1571         spin_lock(&obd_zombie_impexp_lock);
1572         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1573                 print_export_data(exp, "ZOMBIE", locks);
1574         spin_unlock(&obd_zombie_impexp_lock);
1575 }
1576
1577 void obd_exports_barrier(struct obd_device *obd)
1578 {
1579         int waited = 2;
1580         LASSERT(list_empty(&obd->obd_exports));
1581         spin_lock(&obd->obd_dev_lock);
1582         while (!list_empty(&obd->obd_unlinked_exports)) {
1583                 spin_unlock(&obd->obd_dev_lock);
1584                 set_current_state(TASK_UNINTERRUPTIBLE);
1585                 schedule_timeout(cfs_time_seconds(waited));
1586                 if (waited > 5 && IS_PO2(waited)) {
1587                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1588                                       "more than %d seconds. "
1589                                       "The obd refcount = %d. Is it stuck?\n",
1590                                       obd->obd_name, waited,
1591                                       atomic_read(&obd->obd_refcount));
1592                         dump_exports(obd, 1);
1593                 }
1594                 waited *= 2;
1595                 spin_lock(&obd->obd_dev_lock);
1596         }
1597         spin_unlock(&obd->obd_dev_lock);
1598 }
1599 EXPORT_SYMBOL(obd_exports_barrier);
1600
1601 /* Total amount of zombies to be destroyed */
1602 static int zombies_count = 0;
1603
1604 /**
1605  * kill zombie imports and exports
1606  */
1607 void obd_zombie_impexp_cull(void)
1608 {
1609         struct obd_import *import;
1610         struct obd_export *export;
1611         ENTRY;
1612
1613         do {
1614                 spin_lock(&obd_zombie_impexp_lock);
1615
1616                 import = NULL;
1617                 if (!list_empty(&obd_zombie_imports)) {
1618                         import = list_entry(obd_zombie_imports.next,
1619                                             struct obd_import,
1620                                             imp_zombie_chain);
1621                         list_del_init(&import->imp_zombie_chain);
1622                 }
1623
1624                 export = NULL;
1625                 if (!list_empty(&obd_zombie_exports)) {
1626                         export = list_entry(obd_zombie_exports.next,
1627                                             struct obd_export,
1628                                             exp_obd_chain);
1629                         list_del_init(&export->exp_obd_chain);
1630                 }
1631
1632                 spin_unlock(&obd_zombie_impexp_lock);
1633
1634                 if (import != NULL) {
1635                         class_import_destroy(import);
1636                         spin_lock(&obd_zombie_impexp_lock);
1637                         zombies_count--;
1638                         spin_unlock(&obd_zombie_impexp_lock);
1639                 }
1640
1641                 if (export != NULL) {
1642                         class_export_destroy(export);
1643                         spin_lock(&obd_zombie_impexp_lock);
1644                         zombies_count--;
1645                         spin_unlock(&obd_zombie_impexp_lock);
1646                 }
1647
1648                 cond_resched();
1649         } while (import != NULL || export != NULL);
1650         EXIT;
1651 }
1652
1653 static struct completion        obd_zombie_start;
1654 static struct completion        obd_zombie_stop;
1655 static unsigned long            obd_zombie_flags;
1656 static wait_queue_head_t        obd_zombie_waitq;
1657 static pid_t                    obd_zombie_pid;
1658
1659 enum {
1660         OBD_ZOMBIE_STOP         = 0x0001,
1661 };
1662
1663 /**
1664  * check for work for kill zombie import/export thread.
1665  */
1666 static int obd_zombie_impexp_check(void *arg)
1667 {
1668         int rc;
1669
1670         spin_lock(&obd_zombie_impexp_lock);
1671         rc = (zombies_count == 0) &&
1672              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1673         spin_unlock(&obd_zombie_impexp_lock);
1674
1675         RETURN(rc);
1676 }
1677
1678 /**
1679  * Add export to the obd_zombe thread and notify it.
1680  */
1681 static void obd_zombie_export_add(struct obd_export *exp) {
1682         atomic_dec(&obd_stale_export_num);
1683         spin_lock(&exp->exp_obd->obd_dev_lock);
1684         LASSERT(!list_empty(&exp->exp_obd_chain));
1685         list_del_init(&exp->exp_obd_chain);
1686         spin_unlock(&exp->exp_obd->obd_dev_lock);
1687         spin_lock(&obd_zombie_impexp_lock);
1688         zombies_count++;
1689         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1690         spin_unlock(&obd_zombie_impexp_lock);
1691
1692         obd_zombie_impexp_notify();
1693 }
1694
1695 /**
1696  * Add import to the obd_zombe thread and notify it.
1697  */
1698 static void obd_zombie_import_add(struct obd_import *imp) {
1699         LASSERT(imp->imp_sec == NULL);
1700         spin_lock(&obd_zombie_impexp_lock);
1701         LASSERT(list_empty(&imp->imp_zombie_chain));
1702         zombies_count++;
1703         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1704         spin_unlock(&obd_zombie_impexp_lock);
1705
1706         obd_zombie_impexp_notify();
1707 }
1708
1709 /**
1710  * notify import/export destroy thread about new zombie.
1711  */
1712 static void obd_zombie_impexp_notify(void)
1713 {
1714         /*
1715          * Make sure obd_zomebie_impexp_thread get this notification.
1716          * It is possible this signal only get by obd_zombie_barrier, and
1717          * barrier gulps this notification and sleeps away and hangs ensues
1718          */
1719         wake_up_all(&obd_zombie_waitq);
1720 }
1721
1722 /**
1723  * check whether obd_zombie is idle
1724  */
1725 static int obd_zombie_is_idle(void)
1726 {
1727         int rc;
1728
1729         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1730         spin_lock(&obd_zombie_impexp_lock);
1731         rc = (zombies_count == 0);
1732         spin_unlock(&obd_zombie_impexp_lock);
1733         return rc;
1734 }
1735
1736 /**
1737  * wait when obd_zombie import/export queues become empty
1738  */
1739 void obd_zombie_barrier(void)
1740 {
1741         struct l_wait_info lwi = { 0 };
1742
1743         if (obd_zombie_pid == current_pid())
1744                 /* don't wait for myself */
1745                 return;
1746         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1747 }
1748 EXPORT_SYMBOL(obd_zombie_barrier);
1749
1750
1751 struct obd_export *obd_stale_export_get(void)
1752 {
1753         struct obd_export *exp = NULL;
1754         ENTRY;
1755
1756         spin_lock(&obd_stale_export_lock);
1757         if (!list_empty(&obd_stale_exports)) {
1758                 exp = list_entry(obd_stale_exports.next,
1759                                  struct obd_export, exp_stale_list);
1760                 list_del_init(&exp->exp_stale_list);
1761         }
1762         spin_unlock(&obd_stale_export_lock);
1763
1764         if (exp) {
1765                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1766                        atomic_read(&obd_stale_export_num));
1767         }
1768         RETURN(exp);
1769 }
1770 EXPORT_SYMBOL(obd_stale_export_get);
1771
1772 void obd_stale_export_put(struct obd_export *exp)
1773 {
1774         ENTRY;
1775
1776         LASSERT(list_empty(&exp->exp_stale_list));
1777         if (exp->exp_lock_hash &&
1778             atomic_read(&exp->exp_lock_hash->hs_count)) {
1779                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1780                        atomic_read(&obd_stale_export_num));
1781
1782                 spin_lock_bh(&exp->exp_bl_list_lock);
1783                 spin_lock(&obd_stale_export_lock);
1784                 /* Add to the tail if there is no blocked locks,
1785                  * to the head otherwise. */
1786                 if (list_empty(&exp->exp_bl_list))
1787                         list_add_tail(&exp->exp_stale_list,
1788                                       &obd_stale_exports);
1789                 else
1790                         list_add(&exp->exp_stale_list,
1791                                  &obd_stale_exports);
1792
1793                 spin_unlock(&obd_stale_export_lock);
1794                 spin_unlock_bh(&exp->exp_bl_list_lock);
1795         } else {
1796                 class_export_put(exp);
1797         }
1798         EXIT;
1799 }
1800 EXPORT_SYMBOL(obd_stale_export_put);
1801
1802 /**
1803  * Adjust the position of the export in the stale list,
1804  * i.e. move to the head of the list if is needed.
1805  **/
1806 void obd_stale_export_adjust(struct obd_export *exp)
1807 {
1808         LASSERT(exp != NULL);
1809         spin_lock_bh(&exp->exp_bl_list_lock);
1810         spin_lock(&obd_stale_export_lock);
1811
1812         if (!list_empty(&exp->exp_stale_list) &&
1813             !list_empty(&exp->exp_bl_list))
1814                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1815
1816         spin_unlock(&obd_stale_export_lock);
1817         spin_unlock_bh(&exp->exp_bl_list_lock);
1818 }
1819 EXPORT_SYMBOL(obd_stale_export_adjust);
1820
1821 /**
1822  * destroy zombie export/import thread.
1823  */
1824 static int obd_zombie_impexp_thread(void *unused)
1825 {
1826         unshare_fs_struct();
1827         complete(&obd_zombie_start);
1828
1829         obd_zombie_pid = current_pid();
1830
1831         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1832                 struct l_wait_info lwi = { 0 };
1833
1834                 l_wait_event(obd_zombie_waitq,
1835                              !obd_zombie_impexp_check(NULL), &lwi);
1836                 obd_zombie_impexp_cull();
1837
1838                 /*
1839                  * Notify obd_zombie_barrier callers that queues
1840                  * may be empty.
1841                  */
1842                 wake_up(&obd_zombie_waitq);
1843         }
1844
1845         complete(&obd_zombie_stop);
1846
1847         RETURN(0);
1848 }
1849
1850
1851 /**
1852  * start destroy zombie import/export thread
1853  */
1854 int obd_zombie_impexp_init(void)
1855 {
1856         struct task_struct *task;
1857
1858         INIT_LIST_HEAD(&obd_zombie_imports);
1859
1860         INIT_LIST_HEAD(&obd_zombie_exports);
1861         spin_lock_init(&obd_zombie_impexp_lock);
1862         init_completion(&obd_zombie_start);
1863         init_completion(&obd_zombie_stop);
1864         init_waitqueue_head(&obd_zombie_waitq);
1865         obd_zombie_pid = 0;
1866
1867         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1868         if (IS_ERR(task))
1869                 RETURN(PTR_ERR(task));
1870
1871         wait_for_completion(&obd_zombie_start);
1872         RETURN(0);
1873 }
1874 /**
1875  * stop destroy zombie import/export thread
1876  */
1877 void obd_zombie_impexp_stop(void)
1878 {
1879         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1880         obd_zombie_impexp_notify();
1881         wait_for_completion(&obd_zombie_stop);
1882 }
1883
1884 /***** Kernel-userspace comm helpers *******/
1885
1886 /* Get length of entire message, including header */
1887 int kuc_len(int payload_len)
1888 {
1889         return sizeof(struct kuc_hdr) + payload_len;
1890 }
1891 EXPORT_SYMBOL(kuc_len);
1892
1893 /* Get a pointer to kuc header, given a ptr to the payload
1894  * @param p Pointer to payload area
1895  * @returns Pointer to kuc header
1896  */
1897 struct kuc_hdr * kuc_ptr(void *p)
1898 {
1899         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1900         LASSERT(lh->kuc_magic == KUC_MAGIC);
1901         return lh;
1902 }
1903 EXPORT_SYMBOL(kuc_ptr);
1904
1905 /* Test if payload is part of kuc message
1906  * @param p Pointer to payload area
1907  * @returns boolean
1908  */
1909 int kuc_ispayload(void *p)
1910 {
1911         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1912
1913         if (kh->kuc_magic == KUC_MAGIC)
1914                 return 1;
1915         else
1916                 return 0;
1917 }
1918 EXPORT_SYMBOL(kuc_ispayload);
1919
1920 /* Alloc space for a message, and fill in header
1921  * @return Pointer to payload area
1922  */
1923 void *kuc_alloc(int payload_len, int transport, int type)
1924 {
1925         struct kuc_hdr *lh;
1926         int len = kuc_len(payload_len);
1927
1928         OBD_ALLOC(lh, len);
1929         if (lh == NULL)
1930                 return ERR_PTR(-ENOMEM);
1931
1932         lh->kuc_magic = KUC_MAGIC;
1933         lh->kuc_transport = transport;
1934         lh->kuc_msgtype = type;
1935         lh->kuc_msglen = len;
1936
1937         return (void *)(lh + 1);
1938 }
1939 EXPORT_SYMBOL(kuc_alloc);
1940
1941 /* Takes pointer to payload area */
1942 inline void kuc_free(void *p, int payload_len)
1943 {
1944         struct kuc_hdr *lh = kuc_ptr(p);
1945         OBD_FREE(lh, kuc_len(payload_len));
1946 }
1947 EXPORT_SYMBOL(kuc_free);
1948
1949 struct obd_request_slot_waiter {
1950         struct list_head        orsw_entry;
1951         wait_queue_head_t       orsw_waitq;
1952         bool                    orsw_signaled;
1953 };
1954
1955 static bool obd_request_slot_avail(struct client_obd *cli,
1956                                    struct obd_request_slot_waiter *orsw)
1957 {
1958         bool avail;
1959
1960         spin_lock(&cli->cl_loi_list_lock);
1961         avail = !!list_empty(&orsw->orsw_entry);
1962         spin_unlock(&cli->cl_loi_list_lock);
1963
1964         return avail;
1965 };
1966
1967 /*
1968  * For network flow control, the RPC sponsor needs to acquire a credit
1969  * before sending the RPC. The credits count for a connection is defined
1970  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1971  * the subsequent RPC sponsors need to wait until others released their
1972  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1973  */
1974 int obd_get_request_slot(struct client_obd *cli)
1975 {
1976         struct obd_request_slot_waiter   orsw;
1977         struct l_wait_info               lwi;
1978         int                              rc;
1979
1980         spin_lock(&cli->cl_loi_list_lock);
1981         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1982                 cli->cl_r_in_flight++;
1983                 spin_unlock(&cli->cl_loi_list_lock);
1984                 return 0;
1985         }
1986
1987         init_waitqueue_head(&orsw.orsw_waitq);
1988         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1989         orsw.orsw_signaled = false;
1990         spin_unlock(&cli->cl_loi_list_lock);
1991
1992         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1993         rc = l_wait_event(orsw.orsw_waitq,
1994                           obd_request_slot_avail(cli, &orsw) ||
1995                           orsw.orsw_signaled,
1996                           &lwi);
1997
1998         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1999          * freed but other (such as obd_put_request_slot) is using it. */
2000         spin_lock(&cli->cl_loi_list_lock);
2001         if (rc != 0) {
2002                 if (!orsw.orsw_signaled) {
2003                         if (list_empty(&orsw.orsw_entry))
2004                                 cli->cl_r_in_flight--;
2005                         else
2006                                 list_del(&orsw.orsw_entry);
2007                 }
2008         }
2009
2010         if (orsw.orsw_signaled) {
2011                 LASSERT(list_empty(&orsw.orsw_entry));
2012
2013                 rc = -EINTR;
2014         }
2015         spin_unlock(&cli->cl_loi_list_lock);
2016
2017         return rc;
2018 }
2019 EXPORT_SYMBOL(obd_get_request_slot);
2020
2021 void obd_put_request_slot(struct client_obd *cli)
2022 {
2023         struct obd_request_slot_waiter *orsw;
2024
2025         spin_lock(&cli->cl_loi_list_lock);
2026         cli->cl_r_in_flight--;
2027
2028         /* If there is free slot, wakeup the first waiter. */
2029         if (!list_empty(&cli->cl_loi_read_list) &&
2030             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2031                 orsw = list_entry(cli->cl_loi_read_list.next,
2032                                   struct obd_request_slot_waiter, orsw_entry);
2033                 list_del_init(&orsw->orsw_entry);
2034                 cli->cl_r_in_flight++;
2035                 wake_up(&orsw->orsw_waitq);
2036         }
2037         spin_unlock(&cli->cl_loi_list_lock);
2038 }
2039 EXPORT_SYMBOL(obd_put_request_slot);
2040
2041 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2042 {
2043         return cli->cl_max_rpcs_in_flight;
2044 }
2045 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2046
2047 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2048 {
2049         struct obd_request_slot_waiter *orsw;
2050         __u32                           old;
2051         int                             diff;
2052         int                             i;
2053         char                            *typ_name;
2054         int                             rc;
2055
2056         if (max > OBD_MAX_RIF_MAX || max < 1)
2057                 return -ERANGE;
2058
2059         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2060         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2061                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2062                  * strictly lower that max_rpcs_in_flight */
2063                 if (max < 2) {
2064                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2065                                "because it must be higher than "
2066                                "max_mod_rpcs_in_flight value",
2067                                cli->cl_import->imp_obd->obd_name);
2068                         return -ERANGE;
2069                 }
2070                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2071                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2072                         if (rc != 0)
2073                                 return rc;
2074                 }
2075         }
2076
2077         spin_lock(&cli->cl_loi_list_lock);
2078         old = cli->cl_max_rpcs_in_flight;
2079         cli->cl_max_rpcs_in_flight = max;
2080         diff = max - old;
2081
2082         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2083         for (i = 0; i < diff; i++) {
2084                 if (list_empty(&cli->cl_loi_read_list))
2085                         break;
2086
2087                 orsw = list_entry(cli->cl_loi_read_list.next,
2088                                   struct obd_request_slot_waiter, orsw_entry);
2089                 list_del_init(&orsw->orsw_entry);
2090                 cli->cl_r_in_flight++;
2091                 wake_up(&orsw->orsw_waitq);
2092         }
2093         spin_unlock(&cli->cl_loi_list_lock);
2094
2095         return 0;
2096 }
2097 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2098
2099 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2100 {
2101         return cli->cl_max_mod_rpcs_in_flight;
2102 }
2103 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2104
2105 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2106 {
2107         struct obd_connect_data *ocd;
2108         __u16 maxmodrpcs;
2109         __u16 prev;
2110
2111         if (max > OBD_MAX_RIF_MAX || max < 1)
2112                 return -ERANGE;
2113
2114         /* cannot exceed or equal max_rpcs_in_flight */
2115         if (max >= cli->cl_max_rpcs_in_flight) {
2116                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2117                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2118                        cli->cl_import->imp_obd->obd_name,
2119                        max, cli->cl_max_rpcs_in_flight);
2120                 return -ERANGE;
2121         }
2122
2123         /* cannot exceed max modify RPCs in flight supported by the server */
2124         ocd = &cli->cl_import->imp_connect_data;
2125         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2126                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2127         else
2128                 maxmodrpcs = 1;
2129         if (max > maxmodrpcs) {
2130                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2131                        "higher than max_mod_rpcs_per_client value (%hu) "
2132                        "returned by the server at connection\n",
2133                        cli->cl_import->imp_obd->obd_name,
2134                        max, maxmodrpcs);
2135                 return -ERANGE;
2136         }
2137
2138         spin_lock(&cli->cl_mod_rpcs_lock);
2139
2140         prev = cli->cl_max_mod_rpcs_in_flight;
2141         cli->cl_max_mod_rpcs_in_flight = max;
2142
2143         /* wakeup waiters if limit has been increased */
2144         if (cli->cl_max_mod_rpcs_in_flight > prev)
2145                 wake_up(&cli->cl_mod_rpcs_waitq);
2146
2147         spin_unlock(&cli->cl_mod_rpcs_lock);
2148
2149         return 0;
2150 }
2151 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2152
2153
2154 #define pct(a, b) (b ? a * 100 / b : 0)
2155 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2156                                struct seq_file *seq)
2157 {
2158         struct timeval now;
2159         unsigned long mod_tot = 0, mod_cum;
2160         int i;
2161
2162         do_gettimeofday(&now);
2163
2164         spin_lock(&cli->cl_mod_rpcs_lock);
2165
2166         seq_printf(seq, "snapshot_time:         %lu.%lu (secs.usecs)\n",
2167                    now.tv_sec, now.tv_usec);
2168         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2169                    cli->cl_mod_rpcs_in_flight);
2170
2171         seq_printf(seq, "\n\t\t\tmodify\n");
2172         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2173
2174         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2175
2176         mod_cum = 0;
2177         for (i = 0; i < OBD_HIST_MAX; i++) {
2178                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2179                 mod_cum += mod;
2180                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2181                                  i, mod, pct(mod, mod_tot),
2182                                  pct(mod_cum, mod_tot));
2183                 if (mod_cum == mod_tot)
2184                         break;
2185         }
2186
2187         spin_unlock(&cli->cl_mod_rpcs_lock);
2188
2189         return 0;
2190 }
2191 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2192 #undef pct
2193
2194
2195 /* The number of modify RPCs sent in parallel is limited
2196  * because the server has a finite number of slots per client to
2197  * store request result and ensure reply reconstruction when needed.
2198  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2199  * that takes into account server limit and cl_max_rpcs_in_flight
2200  * value.
2201  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2202  * one close request is allowed above the maximum.
2203  */
2204 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2205                                                  bool close_req)
2206 {
2207         bool avail;
2208
2209         /* A slot is available if
2210          * - number of modify RPCs in flight is less than the max
2211          * - it's a close RPC and no other close request is in flight
2212          */
2213         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2214                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2215
2216         return avail;
2217 }
2218
2219 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2220                                          bool close_req)
2221 {
2222         bool avail;
2223
2224         spin_lock(&cli->cl_mod_rpcs_lock);
2225         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2226         spin_unlock(&cli->cl_mod_rpcs_lock);
2227         return avail;
2228 }
2229
2230 /* Get a modify RPC slot from the obd client @cli according
2231  * to the kind of operation @opc that is going to be sent
2232  * and the intent @it of the operation if it applies.
2233  * If the maximum number of modify RPCs in flight is reached
2234  * the thread is put to sleep.
2235  * Returns the tag to be set in the request message. Tag 0
2236  * is reserved for non-modifying requests.
2237  */
2238 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2239                            struct lookup_intent *it)
2240 {
2241         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2242         bool                    close_req = false;
2243         __u16                   i, max;
2244
2245         /* read-only metadata RPCs don't consume a slot on MDT
2246          * for reply reconstruction
2247          */
2248         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2249                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2250                 return 0;
2251
2252         if (opc == MDS_CLOSE)
2253                 close_req = true;
2254
2255         do {
2256                 spin_lock(&cli->cl_mod_rpcs_lock);
2257                 max = cli->cl_max_mod_rpcs_in_flight;
2258                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2259                         /* there is a slot available */
2260                         cli->cl_mod_rpcs_in_flight++;
2261                         if (close_req)
2262                                 cli->cl_close_rpcs_in_flight++;
2263                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2264                                          cli->cl_mod_rpcs_in_flight);
2265                         /* find a free tag */
2266                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2267                                                 max + 1);
2268                         LASSERT(i < OBD_MAX_RIF_MAX);
2269                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2270                         spin_unlock(&cli->cl_mod_rpcs_lock);
2271                         /* tag 0 is reserved for non-modify RPCs */
2272                         return i + 1;
2273                 }
2274                 spin_unlock(&cli->cl_mod_rpcs_lock);
2275
2276                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2277                        "opc %u, max %hu\n",
2278                        cli->cl_import->imp_obd->obd_name, opc, max);
2279
2280                 l_wait_event(cli->cl_mod_rpcs_waitq,
2281                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2282         } while (true);
2283 }
2284 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2285
2286 /* Put a modify RPC slot from the obd client @cli according
2287  * to the kind of operation @opc that has been sent and the
2288  * intent @it of the operation if it applies.
2289  */
2290 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2291                           struct lookup_intent *it, __u16 tag)
2292 {
2293         bool                    close_req = false;
2294
2295         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2296                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2297                 return;
2298
2299         if (opc == MDS_CLOSE)
2300                 close_req = true;
2301
2302         spin_lock(&cli->cl_mod_rpcs_lock);
2303         cli->cl_mod_rpcs_in_flight--;
2304         if (close_req)
2305                 cli->cl_close_rpcs_in_flight--;
2306         /* release the tag in the bitmap */
2307         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2308         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2309         spin_unlock(&cli->cl_mod_rpcs_lock);
2310         wake_up(&cli->cl_mod_rpcs_waitq);
2311 }
2312 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2313