Whamcloud - gitweb
Revert "LU-5951 ptlrpc: track unreplied requests"
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_disk.h>
48 #include <lustre_kernelcomm.h>
49
50 spinlock_t obd_types_lock;
51
52 static struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 static struct kmem_cache *import_cachep;
56
57 static struct list_head obd_zombie_imports;
58 static struct list_head obd_zombie_exports;
59 static spinlock_t  obd_zombie_impexp_lock;
60
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 struct list_head obd_stale_exports;
68 spinlock_t       obd_stale_export_lock;
69 atomic_t         obd_stale_export_num;
70
71 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
72 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
73
74 /*
75  * support functions: we could use inter-module communication, but this
76  * is more portable to other OS's
77  */
78 static struct obd_device *obd_device_alloc(void)
79 {
80         struct obd_device *obd;
81
82         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
83         if (obd != NULL) {
84                 obd->obd_magic = OBD_DEVICE_MAGIC;
85         }
86         return obd;
87 }
88
89 static void obd_device_free(struct obd_device *obd)
90 {
91         LASSERT(obd != NULL);
92         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
93                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
94         if (obd->obd_namespace != NULL) {
95                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
96                        obd, obd->obd_namespace, obd->obd_force);
97                 LBUG();
98         }
99         lu_ref_fini(&obd->obd_reference);
100         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
101 }
102
103 struct obd_type *class_search_type(const char *name)
104 {
105         struct list_head *tmp;
106         struct obd_type *type;
107
108         spin_lock(&obd_types_lock);
109         list_for_each(tmp, &obd_types) {
110                 type = list_entry(tmp, struct obd_type, typ_chain);
111                 if (strcmp(type->typ_name, name) == 0) {
112                         spin_unlock(&obd_types_lock);
113                         return type;
114                 }
115         }
116         spin_unlock(&obd_types_lock);
117         return NULL;
118 }
119 EXPORT_SYMBOL(class_search_type);
120
121 struct obd_type *class_get_type(const char *name)
122 {
123         struct obd_type *type = class_search_type(name);
124
125 #ifdef HAVE_MODULE_LOADING_SUPPORT
126         if (!type) {
127                 const char *modname = name;
128
129                 if (strcmp(modname, "obdfilter") == 0)
130                         modname = "ofd";
131
132                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
133                         modname = LUSTRE_OSP_NAME;
134
135                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
136                         modname = LUSTRE_MDT_NAME;
137
138                 if (!request_module("%s", modname)) {
139                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
140                         type = class_search_type(name);
141                 } else {
142                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
143                                            modname);
144                 }
145         }
146 #endif
147         if (type) {
148                 spin_lock(&type->obd_type_lock);
149                 type->typ_refcnt++;
150                 try_module_get(type->typ_dt_ops->o_owner);
151                 spin_unlock(&type->obd_type_lock);
152         }
153         return type;
154 }
155
156 void class_put_type(struct obd_type *type)
157 {
158         LASSERT(type);
159         spin_lock(&type->obd_type_lock);
160         type->typ_refcnt--;
161         module_put(type->typ_dt_ops->o_owner);
162         spin_unlock(&type->obd_type_lock);
163 }
164
165 #define CLASS_MAX_NAME 1024
166
167 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
168                         bool enable_proc, struct lprocfs_vars *vars,
169                         const char *name, struct lu_device_type *ldt)
170 {
171         struct obd_type *type;
172         int rc = 0;
173         ENTRY;
174
175         /* sanity check */
176         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
177
178         if (class_search_type(name)) {
179                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
180                 RETURN(-EEXIST);
181         }
182
183         rc = -ENOMEM;
184         OBD_ALLOC(type, sizeof(*type));
185         if (type == NULL)
186                 RETURN(rc);
187
188         OBD_ALLOC_PTR(type->typ_dt_ops);
189         OBD_ALLOC_PTR(type->typ_md_ops);
190         OBD_ALLOC(type->typ_name, strlen(name) + 1);
191
192         if (type->typ_dt_ops == NULL ||
193             type->typ_md_ops == NULL ||
194             type->typ_name == NULL)
195                 GOTO (failed, rc);
196
197         *(type->typ_dt_ops) = *dt_ops;
198         /* md_ops is optional */
199         if (md_ops)
200                 *(type->typ_md_ops) = *md_ops;
201         strcpy(type->typ_name, name);
202         spin_lock_init(&type->obd_type_lock);
203
204 #ifdef CONFIG_PROC_FS
205         if (enable_proc) {
206                 type->typ_procroot = lprocfs_register(type->typ_name,
207                                                       proc_lustre_root,
208                                                       vars, type);
209                 if (IS_ERR(type->typ_procroot)) {
210                         rc = PTR_ERR(type->typ_procroot);
211                         type->typ_procroot = NULL;
212                         GOTO(failed, rc);
213                 }
214         }
215 #endif
216         if (ldt != NULL) {
217                 type->typ_lu = ldt;
218                 rc = lu_device_type_init(ldt);
219                 if (rc != 0)
220                         GOTO (failed, rc);
221         }
222
223         spin_lock(&obd_types_lock);
224         list_add(&type->typ_chain, &obd_types);
225         spin_unlock(&obd_types_lock);
226
227         RETURN (0);
228
229 failed:
230         if (type->typ_name != NULL) {
231 #ifdef CONFIG_PROC_FS
232                 if (type->typ_procroot != NULL)
233                         remove_proc_subtree(type->typ_name, proc_lustre_root);
234 #endif
235                 OBD_FREE(type->typ_name, strlen(name) + 1);
236         }
237         if (type->typ_md_ops != NULL)
238                 OBD_FREE_PTR(type->typ_md_ops);
239         if (type->typ_dt_ops != NULL)
240                 OBD_FREE_PTR(type->typ_dt_ops);
241         OBD_FREE(type, sizeof(*type));
242         RETURN(rc);
243 }
244 EXPORT_SYMBOL(class_register_type);
245
246 int class_unregister_type(const char *name)
247 {
248         struct obd_type *type = class_search_type(name);
249         ENTRY;
250
251         if (!type) {
252                 CERROR("unknown obd type\n");
253                 RETURN(-EINVAL);
254         }
255
256         if (type->typ_refcnt) {
257                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
258                 /* This is a bad situation, let's make the best of it */
259                 /* Remove ops, but leave the name for debugging */
260                 OBD_FREE_PTR(type->typ_dt_ops);
261                 OBD_FREE_PTR(type->typ_md_ops);
262                 RETURN(-EBUSY);
263         }
264
265         /* we do not use type->typ_procroot as for compatibility purposes
266          * other modules can share names (i.e. lod can use lov entry). so
267          * we can't reference pointer as it can get invalided when another
268          * module removes the entry */
269 #ifdef CONFIG_PROC_FS
270         if (type->typ_procroot != NULL)
271                 remove_proc_subtree(type->typ_name, proc_lustre_root);
272         if (type->typ_procsym != NULL)
273                 lprocfs_remove(&type->typ_procsym);
274 #endif
275         if (type->typ_lu)
276                 lu_device_type_fini(type->typ_lu);
277
278         spin_lock(&obd_types_lock);
279         list_del(&type->typ_chain);
280         spin_unlock(&obd_types_lock);
281         OBD_FREE(type->typ_name, strlen(name) + 1);
282         if (type->typ_dt_ops != NULL)
283                 OBD_FREE_PTR(type->typ_dt_ops);
284         if (type->typ_md_ops != NULL)
285                 OBD_FREE_PTR(type->typ_md_ops);
286         OBD_FREE(type, sizeof(*type));
287         RETURN(0);
288 } /* class_unregister_type */
289 EXPORT_SYMBOL(class_unregister_type);
290
291 /**
292  * Create a new obd device.
293  *
294  * Find an empty slot in ::obd_devs[], create a new obd device in it.
295  *
296  * \param[in] type_name obd device type string.
297  * \param[in] name      obd device name.
298  *
299  * \retval NULL if create fails, otherwise return the obd device
300  *         pointer created.
301  */
302 struct obd_device *class_newdev(const char *type_name, const char *name)
303 {
304         struct obd_device *result = NULL;
305         struct obd_device *newdev;
306         struct obd_type *type = NULL;
307         int i;
308         int new_obd_minor = 0;
309         ENTRY;
310
311         if (strlen(name) >= MAX_OBD_NAME) {
312                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
313                 RETURN(ERR_PTR(-EINVAL));
314         }
315
316         type = class_get_type(type_name);
317         if (type == NULL){
318                 CERROR("OBD: unknown type: %s\n", type_name);
319                 RETURN(ERR_PTR(-ENODEV));
320         }
321
322         newdev = obd_device_alloc();
323         if (newdev == NULL)
324                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
325
326         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
327
328         write_lock(&obd_dev_lock);
329         for (i = 0; i < class_devno_max(); i++) {
330                 struct obd_device *obd = class_num2obd(i);
331
332                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
333                         CERROR("Device %s already exists at %d, won't add\n",
334                                name, i);
335                         if (result) {
336                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
337                                          "%p obd_magic %08x != %08x\n", result,
338                                          result->obd_magic, OBD_DEVICE_MAGIC);
339                                 LASSERTF(result->obd_minor == new_obd_minor,
340                                          "%p obd_minor %d != %d\n", result,
341                                          result->obd_minor, new_obd_minor);
342
343                                 obd_devs[result->obd_minor] = NULL;
344                                 result->obd_name[0]='\0';
345                          }
346                         result = ERR_PTR(-EEXIST);
347                         break;
348                 }
349                 if (!result && !obd) {
350                         result = newdev;
351                         result->obd_minor = i;
352                         new_obd_minor = i;
353                         result->obd_type = type;
354                         strncpy(result->obd_name, name,
355                                 sizeof(result->obd_name) - 1);
356                         obd_devs[i] = result;
357                 }
358         }
359         write_unlock(&obd_dev_lock);
360
361         if (result == NULL && i >= class_devno_max()) {
362                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
363                        class_devno_max());
364                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
365         }
366
367         if (IS_ERR(result))
368                 GOTO(out, result);
369
370         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
371                result->obd_name, result);
372
373         RETURN(result);
374 out:
375         obd_device_free(newdev);
376 out_type:
377         class_put_type(type);
378         return result;
379 }
380
381 void class_release_dev(struct obd_device *obd)
382 {
383         struct obd_type *obd_type = obd->obd_type;
384
385         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
386                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
387         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
388                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
389         LASSERT(obd_type != NULL);
390
391         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
392                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
393
394         write_lock(&obd_dev_lock);
395         obd_devs[obd->obd_minor] = NULL;
396         write_unlock(&obd_dev_lock);
397         obd_device_free(obd);
398
399         class_put_type(obd_type);
400 }
401
402 int class_name2dev(const char *name)
403 {
404         int i;
405
406         if (!name)
407                 return -1;
408
409         read_lock(&obd_dev_lock);
410         for (i = 0; i < class_devno_max(); i++) {
411                 struct obd_device *obd = class_num2obd(i);
412
413                 if (obd && strcmp(name, obd->obd_name) == 0) {
414                         /* Make sure we finished attaching before we give
415                            out any references */
416                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
417                         if (obd->obd_attached) {
418                                 read_unlock(&obd_dev_lock);
419                                 return i;
420                         }
421                         break;
422                 }
423         }
424         read_unlock(&obd_dev_lock);
425
426         return -1;
427 }
428
429 struct obd_device *class_name2obd(const char *name)
430 {
431         int dev = class_name2dev(name);
432
433         if (dev < 0 || dev > class_devno_max())
434                 return NULL;
435         return class_num2obd(dev);
436 }
437 EXPORT_SYMBOL(class_name2obd);
438
439 int class_uuid2dev(struct obd_uuid *uuid)
440 {
441         int i;
442
443         read_lock(&obd_dev_lock);
444         for (i = 0; i < class_devno_max(); i++) {
445                 struct obd_device *obd = class_num2obd(i);
446
447                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
448                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
449                         read_unlock(&obd_dev_lock);
450                         return i;
451                 }
452         }
453         read_unlock(&obd_dev_lock);
454
455         return -1;
456 }
457
458 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
459 {
460         int dev = class_uuid2dev(uuid);
461         if (dev < 0)
462                 return NULL;
463         return class_num2obd(dev);
464 }
465 EXPORT_SYMBOL(class_uuid2obd);
466
467 /**
468  * Get obd device from ::obd_devs[]
469  *
470  * \param num [in] array index
471  *
472  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
473  *         otherwise return the obd device there.
474  */
475 struct obd_device *class_num2obd(int num)
476 {
477         struct obd_device *obd = NULL;
478
479         if (num < class_devno_max()) {
480                 obd = obd_devs[num];
481                 if (obd == NULL)
482                         return NULL;
483
484                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
485                          "%p obd_magic %08x != %08x\n",
486                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
487                 LASSERTF(obd->obd_minor == num,
488                          "%p obd_minor %0d != %0d\n",
489                          obd, obd->obd_minor, num);
490         }
491
492         return obd;
493 }
494
495 /**
496  * Get obd devices count. Device in any
497  *    state are counted
498  * \retval obd device count
499  */
500 int get_devices_count(void)
501 {
502         int index, max_index = class_devno_max(), dev_count = 0;
503
504         read_lock(&obd_dev_lock);
505         for (index = 0; index <= max_index; index++) {
506                 struct obd_device *obd = class_num2obd(index);
507                 if (obd != NULL)
508                         dev_count++;
509         }
510         read_unlock(&obd_dev_lock);
511
512         return dev_count;
513 }
514 EXPORT_SYMBOL(get_devices_count);
515
516 void class_obd_list(void)
517 {
518         char *status;
519         int i;
520
521         read_lock(&obd_dev_lock);
522         for (i = 0; i < class_devno_max(); i++) {
523                 struct obd_device *obd = class_num2obd(i);
524
525                 if (obd == NULL)
526                         continue;
527                 if (obd->obd_stopping)
528                         status = "ST";
529                 else if (obd->obd_set_up)
530                         status = "UP";
531                 else if (obd->obd_attached)
532                         status = "AT";
533                 else
534                         status = "--";
535                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
536                          i, status, obd->obd_type->typ_name,
537                          obd->obd_name, obd->obd_uuid.uuid,
538                          atomic_read(&obd->obd_refcount));
539         }
540         read_unlock(&obd_dev_lock);
541         return;
542 }
543
544 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
545    specified, then only the client with that uuid is returned,
546    otherwise any client connected to the tgt is returned. */
547 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
548                                           const char * typ_name,
549                                           struct obd_uuid *grp_uuid)
550 {
551         int i;
552
553         read_lock(&obd_dev_lock);
554         for (i = 0; i < class_devno_max(); i++) {
555                 struct obd_device *obd = class_num2obd(i);
556
557                 if (obd == NULL)
558                         continue;
559                 if ((strncmp(obd->obd_type->typ_name, typ_name,
560                              strlen(typ_name)) == 0)) {
561                         if (obd_uuid_equals(tgt_uuid,
562                                             &obd->u.cli.cl_target_uuid) &&
563                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
564                                                          &obd->obd_uuid) : 1)) {
565                                 read_unlock(&obd_dev_lock);
566                                 return obd;
567                         }
568                 }
569         }
570         read_unlock(&obd_dev_lock);
571
572         return NULL;
573 }
574 EXPORT_SYMBOL(class_find_client_obd);
575
576 /* Iterate the obd_device list looking devices have grp_uuid. Start
577    searching at *next, and if a device is found, the next index to look
578    at is saved in *next. If next is NULL, then the first matching device
579    will always be returned. */
580 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
581 {
582         int i;
583
584         if (next == NULL)
585                 i = 0;
586         else if (*next >= 0 && *next < class_devno_max())
587                 i = *next;
588         else
589                 return NULL;
590
591         read_lock(&obd_dev_lock);
592         for (; i < class_devno_max(); i++) {
593                 struct obd_device *obd = class_num2obd(i);
594
595                 if (obd == NULL)
596                         continue;
597                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
598                         if (next != NULL)
599                                 *next = i+1;
600                         read_unlock(&obd_dev_lock);
601                         return obd;
602                 }
603         }
604         read_unlock(&obd_dev_lock);
605
606         return NULL;
607 }
608 EXPORT_SYMBOL(class_devices_in_group);
609
610 /**
611  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
612  * adjust sptlrpc settings accordingly.
613  */
614 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
615 {
616         struct obd_device  *obd;
617         const char         *type;
618         int                 i, rc = 0, rc2;
619
620         LASSERT(namelen > 0);
621
622         read_lock(&obd_dev_lock);
623         for (i = 0; i < class_devno_max(); i++) {
624                 obd = class_num2obd(i);
625
626                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
627                         continue;
628
629                 /* only notify mdc, osc, osp, lwp, mdt, ost
630                  * because only these have a -sptlrpc llog */
631                 type = obd->obd_type->typ_name;
632                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
633                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
634                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
635                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
636                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
637                     strcmp(type, LUSTRE_OST_NAME) != 0)
638                         continue;
639
640                 if (strncmp(obd->obd_name, fsname, namelen))
641                         continue;
642
643                 class_incref(obd, __FUNCTION__, obd);
644                 read_unlock(&obd_dev_lock);
645                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
646                                          sizeof(KEY_SPTLRPC_CONF),
647                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
648                 rc = rc ? rc : rc2;
649                 class_decref(obd, __FUNCTION__, obd);
650                 read_lock(&obd_dev_lock);
651         }
652         read_unlock(&obd_dev_lock);
653         return rc;
654 }
655 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
656
657 void obd_cleanup_caches(void)
658 {
659         ENTRY;
660         if (obd_device_cachep) {
661                 kmem_cache_destroy(obd_device_cachep);
662                 obd_device_cachep = NULL;
663         }
664         if (obdo_cachep) {
665                 kmem_cache_destroy(obdo_cachep);
666                 obdo_cachep = NULL;
667         }
668         if (import_cachep) {
669                 kmem_cache_destroy(import_cachep);
670                 import_cachep = NULL;
671         }
672
673         EXIT;
674 }
675
676 int obd_init_caches(void)
677 {
678         int rc;
679         ENTRY;
680
681         LASSERT(obd_device_cachep == NULL);
682         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
683                                               sizeof(struct obd_device),
684                                               0, 0, NULL);
685         if (!obd_device_cachep)
686                 GOTO(out, rc = -ENOMEM);
687
688         LASSERT(obdo_cachep == NULL);
689         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
690                                         0, 0, NULL);
691         if (!obdo_cachep)
692                 GOTO(out, rc = -ENOMEM);
693
694         LASSERT(import_cachep == NULL);
695         import_cachep = kmem_cache_create("ll_import_cache",
696                                           sizeof(struct obd_import),
697                                           0, 0, NULL);
698         if (!import_cachep)
699                 GOTO(out, rc = -ENOMEM);
700
701         RETURN(0);
702 out:
703         obd_cleanup_caches();
704         RETURN(rc);
705 }
706
707 /* map connection to client */
708 struct obd_export *class_conn2export(struct lustre_handle *conn)
709 {
710         struct obd_export *export;
711         ENTRY;
712
713         if (!conn) {
714                 CDEBUG(D_CACHE, "looking for null handle\n");
715                 RETURN(NULL);
716         }
717
718         if (conn->cookie == -1) {  /* this means assign a new connection */
719                 CDEBUG(D_CACHE, "want a new connection\n");
720                 RETURN(NULL);
721         }
722
723         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
724         export = class_handle2object(conn->cookie, NULL);
725         RETURN(export);
726 }
727 EXPORT_SYMBOL(class_conn2export);
728
729 struct obd_device *class_exp2obd(struct obd_export *exp)
730 {
731         if (exp)
732                 return exp->exp_obd;
733         return NULL;
734 }
735 EXPORT_SYMBOL(class_exp2obd);
736
737 struct obd_device *class_conn2obd(struct lustre_handle *conn)
738 {
739         struct obd_export *export;
740         export = class_conn2export(conn);
741         if (export) {
742                 struct obd_device *obd = export->exp_obd;
743                 class_export_put(export);
744                 return obd;
745         }
746         return NULL;
747 }
748
749 struct obd_import *class_exp2cliimp(struct obd_export *exp)
750 {
751         struct obd_device *obd = exp->exp_obd;
752         if (obd == NULL)
753                 return NULL;
754         return obd->u.cli.cl_import;
755 }
756 EXPORT_SYMBOL(class_exp2cliimp);
757
758 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
759 {
760         struct obd_device *obd = class_conn2obd(conn);
761         if (obd == NULL)
762                 return NULL;
763         return obd->u.cli.cl_import;
764 }
765
766 /* Export management functions */
767 static void class_export_destroy(struct obd_export *exp)
768 {
769         struct obd_device *obd = exp->exp_obd;
770         ENTRY;
771
772         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
773         LASSERT(obd != NULL);
774
775         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
776                exp->exp_client_uuid.uuid, obd->obd_name);
777
778         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
779         if (exp->exp_connection)
780                 ptlrpc_put_connection_superhack(exp->exp_connection);
781
782         LASSERT(list_empty(&exp->exp_outstanding_replies));
783         LASSERT(list_empty(&exp->exp_uncommitted_replies));
784         LASSERT(list_empty(&exp->exp_req_replay_queue));
785         LASSERT(list_empty(&exp->exp_hp_rpcs));
786         obd_destroy_export(exp);
787         class_decref(obd, "export", exp);
788
789         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
790         EXIT;
791 }
792
793 static void export_handle_addref(void *export)
794 {
795         class_export_get(export);
796 }
797
798 static struct portals_handle_ops export_handle_ops = {
799         .hop_addref = export_handle_addref,
800         .hop_free   = NULL,
801 };
802
803 struct obd_export *class_export_get(struct obd_export *exp)
804 {
805         atomic_inc(&exp->exp_refcount);
806         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
807                atomic_read(&exp->exp_refcount));
808         return exp;
809 }
810 EXPORT_SYMBOL(class_export_get);
811
812 void class_export_put(struct obd_export *exp)
813 {
814         LASSERT(exp != NULL);
815         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
816         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
817                atomic_read(&exp->exp_refcount) - 1);
818
819         if (atomic_dec_and_test(&exp->exp_refcount)) {
820                 LASSERT(!list_empty(&exp->exp_obd_chain));
821                 LASSERT(list_empty(&exp->exp_stale_list));
822                 CDEBUG(D_IOCTL, "final put %p/%s\n",
823                        exp, exp->exp_client_uuid.uuid);
824
825                 /* release nid stat refererence */
826                 lprocfs_exp_cleanup(exp);
827
828                 obd_zombie_export_add(exp);
829         }
830 }
831 EXPORT_SYMBOL(class_export_put);
832
833 /* Creates a new export, adds it to the hash table, and returns a
834  * pointer to it. The refcount is 2: one for the hash reference, and
835  * one for the pointer returned by this function. */
836 struct obd_export *class_new_export(struct obd_device *obd,
837                                     struct obd_uuid *cluuid)
838 {
839         struct obd_export *export;
840         struct cfs_hash *hash = NULL;
841         int rc = 0;
842         ENTRY;
843
844         OBD_ALLOC_PTR(export);
845         if (!export)
846                 return ERR_PTR(-ENOMEM);
847
848         export->exp_conn_cnt = 0;
849         export->exp_lock_hash = NULL;
850         export->exp_flock_hash = NULL;
851         atomic_set(&export->exp_refcount, 2);
852         atomic_set(&export->exp_rpc_count, 0);
853         atomic_set(&export->exp_cb_count, 0);
854         atomic_set(&export->exp_locks_count, 0);
855 #if LUSTRE_TRACKS_LOCK_EXP_REFS
856         INIT_LIST_HEAD(&export->exp_locks_list);
857         spin_lock_init(&export->exp_locks_list_guard);
858 #endif
859         atomic_set(&export->exp_replay_count, 0);
860         export->exp_obd = obd;
861         INIT_LIST_HEAD(&export->exp_outstanding_replies);
862         spin_lock_init(&export->exp_uncommitted_replies_lock);
863         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
864         INIT_LIST_HEAD(&export->exp_req_replay_queue);
865         INIT_LIST_HEAD(&export->exp_handle.h_link);
866         INIT_LIST_HEAD(&export->exp_hp_rpcs);
867         INIT_LIST_HEAD(&export->exp_reg_rpcs);
868         class_handle_hash(&export->exp_handle, &export_handle_ops);
869         export->exp_last_request_time = cfs_time_current_sec();
870         spin_lock_init(&export->exp_lock);
871         spin_lock_init(&export->exp_rpc_lock);
872         INIT_HLIST_NODE(&export->exp_uuid_hash);
873         INIT_HLIST_NODE(&export->exp_nid_hash);
874         INIT_HLIST_NODE(&export->exp_gen_hash);
875         spin_lock_init(&export->exp_bl_list_lock);
876         INIT_LIST_HEAD(&export->exp_bl_list);
877         INIT_LIST_HEAD(&export->exp_stale_list);
878
879         export->exp_sp_peer = LUSTRE_SP_ANY;
880         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
881         export->exp_client_uuid = *cluuid;
882         obd_init_export(export);
883
884         spin_lock(&obd->obd_dev_lock);
885         /* shouldn't happen, but might race */
886         if (obd->obd_stopping)
887                 GOTO(exit_unlock, rc = -ENODEV);
888
889         hash = cfs_hash_getref(obd->obd_uuid_hash);
890         if (hash == NULL)
891                 GOTO(exit_unlock, rc = -ENODEV);
892         spin_unlock(&obd->obd_dev_lock);
893
894         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
895                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
896                 if (rc != 0) {
897                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
898                                       obd->obd_name, cluuid->uuid, rc);
899                         GOTO(exit_err, rc = -EALREADY);
900                 }
901         }
902
903         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
904         spin_lock(&obd->obd_dev_lock);
905         if (obd->obd_stopping) {
906                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
907                 GOTO(exit_unlock, rc = -ENODEV);
908         }
909
910         class_incref(obd, "export", export);
911         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
912         list_add_tail(&export->exp_obd_chain_timed,
913                       &export->exp_obd->obd_exports_timed);
914         export->exp_obd->obd_num_exports++;
915         spin_unlock(&obd->obd_dev_lock);
916         cfs_hash_putref(hash);
917         RETURN(export);
918
919 exit_unlock:
920         spin_unlock(&obd->obd_dev_lock);
921 exit_err:
922         if (hash)
923                 cfs_hash_putref(hash);
924         class_handle_unhash(&export->exp_handle);
925         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
926         obd_destroy_export(export);
927         OBD_FREE_PTR(export);
928         return ERR_PTR(rc);
929 }
930 EXPORT_SYMBOL(class_new_export);
931
932 void class_unlink_export(struct obd_export *exp)
933 {
934         class_handle_unhash(&exp->exp_handle);
935
936         spin_lock(&exp->exp_obd->obd_dev_lock);
937         /* delete an uuid-export hashitem from hashtables */
938         if (!hlist_unhashed(&exp->exp_uuid_hash))
939                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
940                              &exp->exp_client_uuid,
941                              &exp->exp_uuid_hash);
942
943         if (!hlist_unhashed(&exp->exp_gen_hash)) {
944                 struct tg_export_data   *ted = &exp->exp_target_data;
945                 struct cfs_hash         *hash;
946
947                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
948                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
949                              &exp->exp_gen_hash);
950                 cfs_hash_putref(hash);
951         }
952
953         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
954         list_del_init(&exp->exp_obd_chain_timed);
955         exp->exp_obd->obd_num_exports--;
956         spin_unlock(&exp->exp_obd->obd_dev_lock);
957         atomic_inc(&obd_stale_export_num);
958
959         /* A reference is kept by obd_stale_exports list */
960         obd_stale_export_put(exp);
961 }
962 EXPORT_SYMBOL(class_unlink_export);
963
964 /* Import management functions */
965 static void class_import_destroy(struct obd_import *imp)
966 {
967         ENTRY;
968
969         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
970                 imp->imp_obd->obd_name);
971
972         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
973
974         ptlrpc_put_connection_superhack(imp->imp_connection);
975
976         while (!list_empty(&imp->imp_conn_list)) {
977                 struct obd_import_conn *imp_conn;
978
979                 imp_conn = list_entry(imp->imp_conn_list.next,
980                                       struct obd_import_conn, oic_item);
981                 list_del_init(&imp_conn->oic_item);
982                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
983                 OBD_FREE(imp_conn, sizeof(*imp_conn));
984         }
985
986         LASSERT(imp->imp_sec == NULL);
987         class_decref(imp->imp_obd, "import", imp);
988         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
989         EXIT;
990 }
991
992 static void import_handle_addref(void *import)
993 {
994         class_import_get(import);
995 }
996
997 static struct portals_handle_ops import_handle_ops = {
998         .hop_addref = import_handle_addref,
999         .hop_free   = NULL,
1000 };
1001
1002 struct obd_import *class_import_get(struct obd_import *import)
1003 {
1004         atomic_inc(&import->imp_refcount);
1005         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1006                atomic_read(&import->imp_refcount),
1007                import->imp_obd->obd_name);
1008         return import;
1009 }
1010 EXPORT_SYMBOL(class_import_get);
1011
1012 void class_import_put(struct obd_import *imp)
1013 {
1014         ENTRY;
1015
1016         LASSERT(list_empty(&imp->imp_zombie_chain));
1017         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1018
1019         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1020                atomic_read(&imp->imp_refcount) - 1,
1021                imp->imp_obd->obd_name);
1022
1023         if (atomic_dec_and_test(&imp->imp_refcount)) {
1024                 CDEBUG(D_INFO, "final put import %p\n", imp);
1025                 obd_zombie_import_add(imp);
1026         }
1027
1028         /* catch possible import put race */
1029         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1030         EXIT;
1031 }
1032 EXPORT_SYMBOL(class_import_put);
1033
1034 static void init_imp_at(struct imp_at *at) {
1035         int i;
1036         at_init(&at->iat_net_latency, 0, 0);
1037         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1038                 /* max service estimates are tracked on the server side, so
1039                    don't use the AT history here, just use the last reported
1040                    val. (But keep hist for proc histogram, worst_ever) */
1041                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1042                         AT_FLG_NOHIST);
1043         }
1044 }
1045
1046 struct obd_import *class_new_import(struct obd_device *obd)
1047 {
1048         struct obd_import *imp;
1049
1050         OBD_ALLOC(imp, sizeof(*imp));
1051         if (imp == NULL)
1052                 return NULL;
1053
1054         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1055         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1056         INIT_LIST_HEAD(&imp->imp_replay_list);
1057         INIT_LIST_HEAD(&imp->imp_sending_list);
1058         INIT_LIST_HEAD(&imp->imp_delayed_list);
1059         INIT_LIST_HEAD(&imp->imp_committed_list);
1060         imp->imp_replay_cursor = &imp->imp_committed_list;
1061         spin_lock_init(&imp->imp_lock);
1062         imp->imp_last_success_conn = 0;
1063         imp->imp_state = LUSTRE_IMP_NEW;
1064         imp->imp_obd = class_incref(obd, "import", imp);
1065         mutex_init(&imp->imp_sec_mutex);
1066         init_waitqueue_head(&imp->imp_recovery_waitq);
1067
1068         atomic_set(&imp->imp_refcount, 2);
1069         atomic_set(&imp->imp_unregistering, 0);
1070         atomic_set(&imp->imp_inflight, 0);
1071         atomic_set(&imp->imp_replay_inflight, 0);
1072         atomic_set(&imp->imp_inval_count, 0);
1073         INIT_LIST_HEAD(&imp->imp_conn_list);
1074         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1075         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1076         init_imp_at(&imp->imp_at);
1077
1078         /* the default magic is V2, will be used in connect RPC, and
1079          * then adjusted according to the flags in request/reply. */
1080         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1081
1082         return imp;
1083 }
1084 EXPORT_SYMBOL(class_new_import);
1085
1086 void class_destroy_import(struct obd_import *import)
1087 {
1088         LASSERT(import != NULL);
1089         LASSERT(import != LP_POISON);
1090
1091         class_handle_unhash(&import->imp_handle);
1092
1093         spin_lock(&import->imp_lock);
1094         import->imp_generation++;
1095         spin_unlock(&import->imp_lock);
1096         class_import_put(import);
1097 }
1098 EXPORT_SYMBOL(class_destroy_import);
1099
1100 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1101
1102 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1103 {
1104         spin_lock(&exp->exp_locks_list_guard);
1105
1106         LASSERT(lock->l_exp_refs_nr >= 0);
1107
1108         if (lock->l_exp_refs_target != NULL &&
1109             lock->l_exp_refs_target != exp) {
1110                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1111                               exp, lock, lock->l_exp_refs_target);
1112         }
1113         if ((lock->l_exp_refs_nr ++) == 0) {
1114                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1115                 lock->l_exp_refs_target = exp;
1116         }
1117         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1118                lock, exp, lock->l_exp_refs_nr);
1119         spin_unlock(&exp->exp_locks_list_guard);
1120 }
1121
1122 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1123 {
1124         spin_lock(&exp->exp_locks_list_guard);
1125         LASSERT(lock->l_exp_refs_nr > 0);
1126         if (lock->l_exp_refs_target != exp) {
1127                 LCONSOLE_WARN("lock %p, "
1128                               "mismatching export pointers: %p, %p\n",
1129                               lock, lock->l_exp_refs_target, exp);
1130         }
1131         if (-- lock->l_exp_refs_nr == 0) {
1132                 list_del_init(&lock->l_exp_refs_link);
1133                 lock->l_exp_refs_target = NULL;
1134         }
1135         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1136                lock, exp, lock->l_exp_refs_nr);
1137         spin_unlock(&exp->exp_locks_list_guard);
1138 }
1139 #endif
1140
1141 /* A connection defines an export context in which preallocation can
1142    be managed. This releases the export pointer reference, and returns
1143    the export handle, so the export refcount is 1 when this function
1144    returns. */
1145 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1146                   struct obd_uuid *cluuid)
1147 {
1148         struct obd_export *export;
1149         LASSERT(conn != NULL);
1150         LASSERT(obd != NULL);
1151         LASSERT(cluuid != NULL);
1152         ENTRY;
1153
1154         export = class_new_export(obd, cluuid);
1155         if (IS_ERR(export))
1156                 RETURN(PTR_ERR(export));
1157
1158         conn->cookie = export->exp_handle.h_cookie;
1159         class_export_put(export);
1160
1161         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1162                cluuid->uuid, conn->cookie);
1163         RETURN(0);
1164 }
1165 EXPORT_SYMBOL(class_connect);
1166
1167 /* if export is involved in recovery then clean up related things */
1168 static void class_export_recovery_cleanup(struct obd_export *exp)
1169 {
1170         struct obd_device *obd = exp->exp_obd;
1171
1172         spin_lock(&obd->obd_recovery_task_lock);
1173         if (obd->obd_recovering) {
1174                 if (exp->exp_in_recovery) {
1175                         spin_lock(&exp->exp_lock);
1176                         exp->exp_in_recovery = 0;
1177                         spin_unlock(&exp->exp_lock);
1178                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1179                         atomic_dec(&obd->obd_connected_clients);
1180                 }
1181
1182                 /* if called during recovery then should update
1183                  * obd_stale_clients counter,
1184                  * lightweight exports are not counted */
1185                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1186                         exp->exp_obd->obd_stale_clients++;
1187         }
1188         spin_unlock(&obd->obd_recovery_task_lock);
1189
1190         spin_lock(&exp->exp_lock);
1191         /** Cleanup req replay fields */
1192         if (exp->exp_req_replay_needed) {
1193                 exp->exp_req_replay_needed = 0;
1194
1195                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1196                 atomic_dec(&obd->obd_req_replay_clients);
1197         }
1198
1199         /** Cleanup lock replay data */
1200         if (exp->exp_lock_replay_needed) {
1201                 exp->exp_lock_replay_needed = 0;
1202
1203                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1204                 atomic_dec(&obd->obd_lock_replay_clients);
1205         }
1206         spin_unlock(&exp->exp_lock);
1207 }
1208
1209 /* This function removes 1-3 references from the export:
1210  * 1 - for export pointer passed
1211  * and if disconnect really need
1212  * 2 - removing from hash
1213  * 3 - in client_unlink_export
1214  * The export pointer passed to this function can destroyed */
1215 int class_disconnect(struct obd_export *export)
1216 {
1217         int already_disconnected;
1218         ENTRY;
1219
1220         if (export == NULL) {
1221                 CWARN("attempting to free NULL export %p\n", export);
1222                 RETURN(-EINVAL);
1223         }
1224
1225         spin_lock(&export->exp_lock);
1226         already_disconnected = export->exp_disconnected;
1227         export->exp_disconnected = 1;
1228         spin_unlock(&export->exp_lock);
1229
1230         /* class_cleanup(), abort_recovery(), and class_fail_export()
1231          * all end up in here, and if any of them race we shouldn't
1232          * call extra class_export_puts(). */
1233         if (already_disconnected) {
1234                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1235                 GOTO(no_disconn, already_disconnected);
1236         }
1237
1238         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1239                export->exp_handle.h_cookie);
1240
1241         if (!hlist_unhashed(&export->exp_nid_hash))
1242                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1243                              &export->exp_connection->c_peer.nid,
1244                              &export->exp_nid_hash);
1245
1246         class_export_recovery_cleanup(export);
1247         class_unlink_export(export);
1248 no_disconn:
1249         class_export_put(export);
1250         RETURN(0);
1251 }
1252 EXPORT_SYMBOL(class_disconnect);
1253
1254 /* Return non-zero for a fully connected export */
1255 int class_connected_export(struct obd_export *exp)
1256 {
1257         int connected = 0;
1258
1259         if (exp) {
1260                 spin_lock(&exp->exp_lock);
1261                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1262                 spin_unlock(&exp->exp_lock);
1263         }
1264         return connected;
1265 }
1266 EXPORT_SYMBOL(class_connected_export);
1267
1268 static void class_disconnect_export_list(struct list_head *list,
1269                                          enum obd_option flags)
1270 {
1271         int rc;
1272         struct obd_export *exp;
1273         ENTRY;
1274
1275         /* It's possible that an export may disconnect itself, but
1276          * nothing else will be added to this list. */
1277         while (!list_empty(list)) {
1278                 exp = list_entry(list->next, struct obd_export,
1279                                  exp_obd_chain);
1280                 /* need for safe call CDEBUG after obd_disconnect */
1281                 class_export_get(exp);
1282
1283                 spin_lock(&exp->exp_lock);
1284                 exp->exp_flags = flags;
1285                 spin_unlock(&exp->exp_lock);
1286
1287                 if (obd_uuid_equals(&exp->exp_client_uuid,
1288                                     &exp->exp_obd->obd_uuid)) {
1289                         CDEBUG(D_HA,
1290                                "exp %p export uuid == obd uuid, don't discon\n",
1291                                exp);
1292                         /* Need to delete this now so we don't end up pointing
1293                          * to work_list later when this export is cleaned up. */
1294                         list_del_init(&exp->exp_obd_chain);
1295                         class_export_put(exp);
1296                         continue;
1297                 }
1298
1299                 class_export_get(exp);
1300                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1301                        "last request at "CFS_TIME_T"\n",
1302                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1303                        exp, exp->exp_last_request_time);
1304                 /* release one export reference anyway */
1305                 rc = obd_disconnect(exp);
1306
1307                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1308                        obd_export_nid2str(exp), exp, rc);
1309                 class_export_put(exp);
1310         }
1311         EXIT;
1312 }
1313
1314 void class_disconnect_exports(struct obd_device *obd)
1315 {
1316         struct list_head work_list;
1317         ENTRY;
1318
1319         /* Move all of the exports from obd_exports to a work list, en masse. */
1320         INIT_LIST_HEAD(&work_list);
1321         spin_lock(&obd->obd_dev_lock);
1322         list_splice_init(&obd->obd_exports, &work_list);
1323         list_splice_init(&obd->obd_delayed_exports, &work_list);
1324         spin_unlock(&obd->obd_dev_lock);
1325
1326         if (!list_empty(&work_list)) {
1327                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1328                        "disconnecting them\n", obd->obd_minor, obd);
1329                 class_disconnect_export_list(&work_list,
1330                                              exp_flags_from_obd(obd));
1331         } else
1332                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1333                        obd->obd_minor, obd);
1334         EXIT;
1335 }
1336 EXPORT_SYMBOL(class_disconnect_exports);
1337
1338 /* Remove exports that have not completed recovery.
1339  */
1340 void class_disconnect_stale_exports(struct obd_device *obd,
1341                                     int (*test_export)(struct obd_export *))
1342 {
1343         struct list_head work_list;
1344         struct obd_export *exp, *n;
1345         int evicted = 0;
1346         ENTRY;
1347
1348         INIT_LIST_HEAD(&work_list);
1349         spin_lock(&obd->obd_dev_lock);
1350         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1351                                  exp_obd_chain) {
1352                 /* don't count self-export as client */
1353                 if (obd_uuid_equals(&exp->exp_client_uuid,
1354                                     &exp->exp_obd->obd_uuid))
1355                         continue;
1356
1357                 /* don't evict clients which have no slot in last_rcvd
1358                  * (e.g. lightweight connection) */
1359                 if (exp->exp_target_data.ted_lr_idx == -1)
1360                         continue;
1361
1362                 spin_lock(&exp->exp_lock);
1363                 if (exp->exp_failed || test_export(exp)) {
1364                         spin_unlock(&exp->exp_lock);
1365                         continue;
1366                 }
1367                 exp->exp_failed = 1;
1368                 spin_unlock(&exp->exp_lock);
1369
1370                 list_move(&exp->exp_obd_chain, &work_list);
1371                 evicted++;
1372                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1373                        obd->obd_name, exp->exp_client_uuid.uuid,
1374                        exp->exp_connection == NULL ? "<unknown>" :
1375                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1376                 print_export_data(exp, "EVICTING", 0);
1377         }
1378         spin_unlock(&obd->obd_dev_lock);
1379
1380         if (evicted)
1381                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1382                               obd->obd_name, evicted);
1383
1384         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1385                                                  OBD_OPT_ABORT_RECOV);
1386         EXIT;
1387 }
1388 EXPORT_SYMBOL(class_disconnect_stale_exports);
1389
1390 void class_fail_export(struct obd_export *exp)
1391 {
1392         int rc, already_failed;
1393
1394         spin_lock(&exp->exp_lock);
1395         already_failed = exp->exp_failed;
1396         exp->exp_failed = 1;
1397         spin_unlock(&exp->exp_lock);
1398
1399         if (already_failed) {
1400                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1401                        exp, exp->exp_client_uuid.uuid);
1402                 return;
1403         }
1404
1405         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1406                exp, exp->exp_client_uuid.uuid);
1407
1408         if (obd_dump_on_timeout)
1409                 libcfs_debug_dumplog();
1410
1411         /* need for safe call CDEBUG after obd_disconnect */
1412         class_export_get(exp);
1413
1414         /* Most callers into obd_disconnect are removing their own reference
1415          * (request, for example) in addition to the one from the hash table.
1416          * We don't have such a reference here, so make one. */
1417         class_export_get(exp);
1418         rc = obd_disconnect(exp);
1419         if (rc)
1420                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1421         else
1422                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1423                        exp, exp->exp_client_uuid.uuid);
1424         class_export_put(exp);
1425 }
1426 EXPORT_SYMBOL(class_fail_export);
1427
1428 char *obd_export_nid2str(struct obd_export *exp)
1429 {
1430         if (exp->exp_connection != NULL)
1431                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1432
1433         return "(no nid)";
1434 }
1435 EXPORT_SYMBOL(obd_export_nid2str);
1436
1437 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1438 {
1439         struct cfs_hash *nid_hash;
1440         struct obd_export *doomed_exp = NULL;
1441         int exports_evicted = 0;
1442
1443         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1444
1445         spin_lock(&obd->obd_dev_lock);
1446         /* umount has run already, so evict thread should leave
1447          * its task to umount thread now */
1448         if (obd->obd_stopping) {
1449                 spin_unlock(&obd->obd_dev_lock);
1450                 return exports_evicted;
1451         }
1452         nid_hash = obd->obd_nid_hash;
1453         cfs_hash_getref(nid_hash);
1454         spin_unlock(&obd->obd_dev_lock);
1455
1456         do {
1457                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1458                 if (doomed_exp == NULL)
1459                         break;
1460
1461                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1462                          "nid %s found, wanted nid %s, requested nid %s\n",
1463                          obd_export_nid2str(doomed_exp),
1464                          libcfs_nid2str(nid_key), nid);
1465                 LASSERTF(doomed_exp != obd->obd_self_export,
1466                          "self-export is hashed by NID?\n");
1467                 exports_evicted++;
1468                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1469                               "request\n", obd->obd_name,
1470                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1471                               obd_export_nid2str(doomed_exp));
1472                 class_fail_export(doomed_exp);
1473                 class_export_put(doomed_exp);
1474         } while (1);
1475
1476         cfs_hash_putref(nid_hash);
1477
1478         if (!exports_evicted)
1479                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1480                        obd->obd_name, nid);
1481         return exports_evicted;
1482 }
1483 EXPORT_SYMBOL(obd_export_evict_by_nid);
1484
1485 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1486 {
1487         struct cfs_hash *uuid_hash;
1488         struct obd_export *doomed_exp = NULL;
1489         struct obd_uuid doomed_uuid;
1490         int exports_evicted = 0;
1491
1492         spin_lock(&obd->obd_dev_lock);
1493         if (obd->obd_stopping) {
1494                 spin_unlock(&obd->obd_dev_lock);
1495                 return exports_evicted;
1496         }
1497         uuid_hash = obd->obd_uuid_hash;
1498         cfs_hash_getref(uuid_hash);
1499         spin_unlock(&obd->obd_dev_lock);
1500
1501         obd_str2uuid(&doomed_uuid, uuid);
1502         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1503                 CERROR("%s: can't evict myself\n", obd->obd_name);
1504                 cfs_hash_putref(uuid_hash);
1505                 return exports_evicted;
1506         }
1507
1508         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1509
1510         if (doomed_exp == NULL) {
1511                 CERROR("%s: can't disconnect %s: no exports found\n",
1512                        obd->obd_name, uuid);
1513         } else {
1514                 CWARN("%s: evicting %s at adminstrative request\n",
1515                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1516                 class_fail_export(doomed_exp);
1517                 class_export_put(doomed_exp);
1518                 exports_evicted++;
1519         }
1520         cfs_hash_putref(uuid_hash);
1521
1522         return exports_evicted;
1523 }
1524
1525 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1526 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1527 #endif
1528
1529 static void print_export_data(struct obd_export *exp, const char *status,
1530                               int locks)
1531 {
1532         struct ptlrpc_reply_state *rs;
1533         struct ptlrpc_reply_state *first_reply = NULL;
1534         int nreplies = 0;
1535
1536         spin_lock(&exp->exp_lock);
1537         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1538                             rs_exp_list) {
1539                 if (nreplies == 0)
1540                         first_reply = rs;
1541                 nreplies++;
1542         }
1543         spin_unlock(&exp->exp_lock);
1544
1545         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1546                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1547                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1548                atomic_read(&exp->exp_rpc_count),
1549                atomic_read(&exp->exp_cb_count),
1550                atomic_read(&exp->exp_locks_count),
1551                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1552                nreplies, first_reply, nreplies > 3 ? "..." : "",
1553                exp->exp_last_committed);
1554 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1555         if (locks && class_export_dump_hook != NULL)
1556                 class_export_dump_hook(exp);
1557 #endif
1558 }
1559
1560 void dump_exports(struct obd_device *obd, int locks)
1561 {
1562         struct obd_export *exp;
1563
1564         spin_lock(&obd->obd_dev_lock);
1565         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1566                 print_export_data(exp, "ACTIVE", locks);
1567         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1568                 print_export_data(exp, "UNLINKED", locks);
1569         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1570                 print_export_data(exp, "DELAYED", locks);
1571         spin_unlock(&obd->obd_dev_lock);
1572         spin_lock(&obd_zombie_impexp_lock);
1573         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1574                 print_export_data(exp, "ZOMBIE", locks);
1575         spin_unlock(&obd_zombie_impexp_lock);
1576 }
1577
1578 void obd_exports_barrier(struct obd_device *obd)
1579 {
1580         int waited = 2;
1581         LASSERT(list_empty(&obd->obd_exports));
1582         spin_lock(&obd->obd_dev_lock);
1583         while (!list_empty(&obd->obd_unlinked_exports)) {
1584                 spin_unlock(&obd->obd_dev_lock);
1585                 set_current_state(TASK_UNINTERRUPTIBLE);
1586                 schedule_timeout(cfs_time_seconds(waited));
1587                 if (waited > 5 && IS_PO2(waited)) {
1588                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1589                                       "more than %d seconds. "
1590                                       "The obd refcount = %d. Is it stuck?\n",
1591                                       obd->obd_name, waited,
1592                                       atomic_read(&obd->obd_refcount));
1593                         dump_exports(obd, 1);
1594                 }
1595                 waited *= 2;
1596                 spin_lock(&obd->obd_dev_lock);
1597         }
1598         spin_unlock(&obd->obd_dev_lock);
1599 }
1600 EXPORT_SYMBOL(obd_exports_barrier);
1601
1602 /* Total amount of zombies to be destroyed */
1603 static int zombies_count = 0;
1604
1605 /**
1606  * kill zombie imports and exports
1607  */
1608 void obd_zombie_impexp_cull(void)
1609 {
1610         struct obd_import *import;
1611         struct obd_export *export;
1612         ENTRY;
1613
1614         do {
1615                 spin_lock(&obd_zombie_impexp_lock);
1616
1617                 import = NULL;
1618                 if (!list_empty(&obd_zombie_imports)) {
1619                         import = list_entry(obd_zombie_imports.next,
1620                                             struct obd_import,
1621                                             imp_zombie_chain);
1622                         list_del_init(&import->imp_zombie_chain);
1623                 }
1624
1625                 export = NULL;
1626                 if (!list_empty(&obd_zombie_exports)) {
1627                         export = list_entry(obd_zombie_exports.next,
1628                                             struct obd_export,
1629                                             exp_obd_chain);
1630                         list_del_init(&export->exp_obd_chain);
1631                 }
1632
1633                 spin_unlock(&obd_zombie_impexp_lock);
1634
1635                 if (import != NULL) {
1636                         class_import_destroy(import);
1637                         spin_lock(&obd_zombie_impexp_lock);
1638                         zombies_count--;
1639                         spin_unlock(&obd_zombie_impexp_lock);
1640                 }
1641
1642                 if (export != NULL) {
1643                         class_export_destroy(export);
1644                         spin_lock(&obd_zombie_impexp_lock);
1645                         zombies_count--;
1646                         spin_unlock(&obd_zombie_impexp_lock);
1647                 }
1648
1649                 cond_resched();
1650         } while (import != NULL || export != NULL);
1651         EXIT;
1652 }
1653
1654 static struct completion        obd_zombie_start;
1655 static struct completion        obd_zombie_stop;
1656 static unsigned long            obd_zombie_flags;
1657 static wait_queue_head_t        obd_zombie_waitq;
1658 static pid_t                    obd_zombie_pid;
1659
1660 enum {
1661         OBD_ZOMBIE_STOP         = 0x0001,
1662 };
1663
1664 /**
1665  * check for work for kill zombie import/export thread.
1666  */
1667 static int obd_zombie_impexp_check(void *arg)
1668 {
1669         int rc;
1670
1671         spin_lock(&obd_zombie_impexp_lock);
1672         rc = (zombies_count == 0) &&
1673              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1674         spin_unlock(&obd_zombie_impexp_lock);
1675
1676         RETURN(rc);
1677 }
1678
1679 /**
1680  * Add export to the obd_zombe thread and notify it.
1681  */
1682 static void obd_zombie_export_add(struct obd_export *exp) {
1683         atomic_dec(&obd_stale_export_num);
1684         spin_lock(&exp->exp_obd->obd_dev_lock);
1685         LASSERT(!list_empty(&exp->exp_obd_chain));
1686         list_del_init(&exp->exp_obd_chain);
1687         spin_unlock(&exp->exp_obd->obd_dev_lock);
1688         spin_lock(&obd_zombie_impexp_lock);
1689         zombies_count++;
1690         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1691         spin_unlock(&obd_zombie_impexp_lock);
1692
1693         obd_zombie_impexp_notify();
1694 }
1695
1696 /**
1697  * Add import to the obd_zombe thread and notify it.
1698  */
1699 static void obd_zombie_import_add(struct obd_import *imp) {
1700         LASSERT(imp->imp_sec == NULL);
1701         spin_lock(&obd_zombie_impexp_lock);
1702         LASSERT(list_empty(&imp->imp_zombie_chain));
1703         zombies_count++;
1704         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1705         spin_unlock(&obd_zombie_impexp_lock);
1706
1707         obd_zombie_impexp_notify();
1708 }
1709
1710 /**
1711  * notify import/export destroy thread about new zombie.
1712  */
1713 static void obd_zombie_impexp_notify(void)
1714 {
1715         /*
1716          * Make sure obd_zomebie_impexp_thread get this notification.
1717          * It is possible this signal only get by obd_zombie_barrier, and
1718          * barrier gulps this notification and sleeps away and hangs ensues
1719          */
1720         wake_up_all(&obd_zombie_waitq);
1721 }
1722
1723 /**
1724  * check whether obd_zombie is idle
1725  */
1726 static int obd_zombie_is_idle(void)
1727 {
1728         int rc;
1729
1730         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1731         spin_lock(&obd_zombie_impexp_lock);
1732         rc = (zombies_count == 0);
1733         spin_unlock(&obd_zombie_impexp_lock);
1734         return rc;
1735 }
1736
1737 /**
1738  * wait when obd_zombie import/export queues become empty
1739  */
1740 void obd_zombie_barrier(void)
1741 {
1742         struct l_wait_info lwi = { 0 };
1743
1744         if (obd_zombie_pid == current_pid())
1745                 /* don't wait for myself */
1746                 return;
1747         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1748 }
1749 EXPORT_SYMBOL(obd_zombie_barrier);
1750
1751
1752 struct obd_export *obd_stale_export_get(void)
1753 {
1754         struct obd_export *exp = NULL;
1755         ENTRY;
1756
1757         spin_lock(&obd_stale_export_lock);
1758         if (!list_empty(&obd_stale_exports)) {
1759                 exp = list_entry(obd_stale_exports.next,
1760                                  struct obd_export, exp_stale_list);
1761                 list_del_init(&exp->exp_stale_list);
1762         }
1763         spin_unlock(&obd_stale_export_lock);
1764
1765         if (exp) {
1766                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1767                        atomic_read(&obd_stale_export_num));
1768         }
1769         RETURN(exp);
1770 }
1771 EXPORT_SYMBOL(obd_stale_export_get);
1772
1773 void obd_stale_export_put(struct obd_export *exp)
1774 {
1775         ENTRY;
1776
1777         LASSERT(list_empty(&exp->exp_stale_list));
1778         if (exp->exp_lock_hash &&
1779             atomic_read(&exp->exp_lock_hash->hs_count)) {
1780                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1781                        atomic_read(&obd_stale_export_num));
1782
1783                 spin_lock_bh(&exp->exp_bl_list_lock);
1784                 spin_lock(&obd_stale_export_lock);
1785                 /* Add to the tail if there is no blocked locks,
1786                  * to the head otherwise. */
1787                 if (list_empty(&exp->exp_bl_list))
1788                         list_add_tail(&exp->exp_stale_list,
1789                                       &obd_stale_exports);
1790                 else
1791                         list_add(&exp->exp_stale_list,
1792                                  &obd_stale_exports);
1793
1794                 spin_unlock(&obd_stale_export_lock);
1795                 spin_unlock_bh(&exp->exp_bl_list_lock);
1796         } else {
1797                 class_export_put(exp);
1798         }
1799         EXIT;
1800 }
1801 EXPORT_SYMBOL(obd_stale_export_put);
1802
1803 /**
1804  * Adjust the position of the export in the stale list,
1805  * i.e. move to the head of the list if is needed.
1806  **/
1807 void obd_stale_export_adjust(struct obd_export *exp)
1808 {
1809         LASSERT(exp != NULL);
1810         spin_lock_bh(&exp->exp_bl_list_lock);
1811         spin_lock(&obd_stale_export_lock);
1812
1813         if (!list_empty(&exp->exp_stale_list) &&
1814             !list_empty(&exp->exp_bl_list))
1815                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1816
1817         spin_unlock(&obd_stale_export_lock);
1818         spin_unlock_bh(&exp->exp_bl_list_lock);
1819 }
1820 EXPORT_SYMBOL(obd_stale_export_adjust);
1821
1822 /**
1823  * destroy zombie export/import thread.
1824  */
1825 static int obd_zombie_impexp_thread(void *unused)
1826 {
1827         unshare_fs_struct();
1828         complete(&obd_zombie_start);
1829
1830         obd_zombie_pid = current_pid();
1831
1832         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1833                 struct l_wait_info lwi = { 0 };
1834
1835                 l_wait_event(obd_zombie_waitq,
1836                              !obd_zombie_impexp_check(NULL), &lwi);
1837                 obd_zombie_impexp_cull();
1838
1839                 /*
1840                  * Notify obd_zombie_barrier callers that queues
1841                  * may be empty.
1842                  */
1843                 wake_up(&obd_zombie_waitq);
1844         }
1845
1846         complete(&obd_zombie_stop);
1847
1848         RETURN(0);
1849 }
1850
1851
1852 /**
1853  * start destroy zombie import/export thread
1854  */
1855 int obd_zombie_impexp_init(void)
1856 {
1857         struct task_struct *task;
1858
1859         INIT_LIST_HEAD(&obd_zombie_imports);
1860
1861         INIT_LIST_HEAD(&obd_zombie_exports);
1862         spin_lock_init(&obd_zombie_impexp_lock);
1863         init_completion(&obd_zombie_start);
1864         init_completion(&obd_zombie_stop);
1865         init_waitqueue_head(&obd_zombie_waitq);
1866         obd_zombie_pid = 0;
1867
1868         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1869         if (IS_ERR(task))
1870                 RETURN(PTR_ERR(task));
1871
1872         wait_for_completion(&obd_zombie_start);
1873         RETURN(0);
1874 }
1875 /**
1876  * stop destroy zombie import/export thread
1877  */
1878 void obd_zombie_impexp_stop(void)
1879 {
1880         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1881         obd_zombie_impexp_notify();
1882         wait_for_completion(&obd_zombie_stop);
1883 }
1884
1885 /***** Kernel-userspace comm helpers *******/
1886
1887 /* Get length of entire message, including header */
1888 int kuc_len(int payload_len)
1889 {
1890         return sizeof(struct kuc_hdr) + payload_len;
1891 }
1892 EXPORT_SYMBOL(kuc_len);
1893
1894 /* Get a pointer to kuc header, given a ptr to the payload
1895  * @param p Pointer to payload area
1896  * @returns Pointer to kuc header
1897  */
1898 struct kuc_hdr * kuc_ptr(void *p)
1899 {
1900         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1901         LASSERT(lh->kuc_magic == KUC_MAGIC);
1902         return lh;
1903 }
1904 EXPORT_SYMBOL(kuc_ptr);
1905
1906 /* Test if payload is part of kuc message
1907  * @param p Pointer to payload area
1908  * @returns boolean
1909  */
1910 int kuc_ispayload(void *p)
1911 {
1912         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1913
1914         if (kh->kuc_magic == KUC_MAGIC)
1915                 return 1;
1916         else
1917                 return 0;
1918 }
1919 EXPORT_SYMBOL(kuc_ispayload);
1920
1921 /* Alloc space for a message, and fill in header
1922  * @return Pointer to payload area
1923  */
1924 void *kuc_alloc(int payload_len, int transport, int type)
1925 {
1926         struct kuc_hdr *lh;
1927         int len = kuc_len(payload_len);
1928
1929         OBD_ALLOC(lh, len);
1930         if (lh == NULL)
1931                 return ERR_PTR(-ENOMEM);
1932
1933         lh->kuc_magic = KUC_MAGIC;
1934         lh->kuc_transport = transport;
1935         lh->kuc_msgtype = type;
1936         lh->kuc_msglen = len;
1937
1938         return (void *)(lh + 1);
1939 }
1940 EXPORT_SYMBOL(kuc_alloc);
1941
1942 /* Takes pointer to payload area */
1943 inline void kuc_free(void *p, int payload_len)
1944 {
1945         struct kuc_hdr *lh = kuc_ptr(p);
1946         OBD_FREE(lh, kuc_len(payload_len));
1947 }
1948 EXPORT_SYMBOL(kuc_free);
1949
1950 struct obd_request_slot_waiter {
1951         struct list_head        orsw_entry;
1952         wait_queue_head_t       orsw_waitq;
1953         bool                    orsw_signaled;
1954 };
1955
1956 static bool obd_request_slot_avail(struct client_obd *cli,
1957                                    struct obd_request_slot_waiter *orsw)
1958 {
1959         bool avail;
1960
1961         spin_lock(&cli->cl_loi_list_lock);
1962         avail = !!list_empty(&orsw->orsw_entry);
1963         spin_unlock(&cli->cl_loi_list_lock);
1964
1965         return avail;
1966 };
1967
1968 /*
1969  * For network flow control, the RPC sponsor needs to acquire a credit
1970  * before sending the RPC. The credits count for a connection is defined
1971  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1972  * the subsequent RPC sponsors need to wait until others released their
1973  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1974  */
1975 int obd_get_request_slot(struct client_obd *cli)
1976 {
1977         struct obd_request_slot_waiter   orsw;
1978         struct l_wait_info               lwi;
1979         int                              rc;
1980
1981         spin_lock(&cli->cl_loi_list_lock);
1982         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1983                 cli->cl_r_in_flight++;
1984                 spin_unlock(&cli->cl_loi_list_lock);
1985                 return 0;
1986         }
1987
1988         init_waitqueue_head(&orsw.orsw_waitq);
1989         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1990         orsw.orsw_signaled = false;
1991         spin_unlock(&cli->cl_loi_list_lock);
1992
1993         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1994         rc = l_wait_event(orsw.orsw_waitq,
1995                           obd_request_slot_avail(cli, &orsw) ||
1996                           orsw.orsw_signaled,
1997                           &lwi);
1998
1999         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2000          * freed but other (such as obd_put_request_slot) is using it. */
2001         spin_lock(&cli->cl_loi_list_lock);
2002         if (rc != 0) {
2003                 if (!orsw.orsw_signaled) {
2004                         if (list_empty(&orsw.orsw_entry))
2005                                 cli->cl_r_in_flight--;
2006                         else
2007                                 list_del(&orsw.orsw_entry);
2008                 }
2009         }
2010
2011         if (orsw.orsw_signaled) {
2012                 LASSERT(list_empty(&orsw.orsw_entry));
2013
2014                 rc = -EINTR;
2015         }
2016         spin_unlock(&cli->cl_loi_list_lock);
2017
2018         return rc;
2019 }
2020 EXPORT_SYMBOL(obd_get_request_slot);
2021
2022 void obd_put_request_slot(struct client_obd *cli)
2023 {
2024         struct obd_request_slot_waiter *orsw;
2025
2026         spin_lock(&cli->cl_loi_list_lock);
2027         cli->cl_r_in_flight--;
2028
2029         /* If there is free slot, wakeup the first waiter. */
2030         if (!list_empty(&cli->cl_loi_read_list) &&
2031             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2032                 orsw = list_entry(cli->cl_loi_read_list.next,
2033                                   struct obd_request_slot_waiter, orsw_entry);
2034                 list_del_init(&orsw->orsw_entry);
2035                 cli->cl_r_in_flight++;
2036                 wake_up(&orsw->orsw_waitq);
2037         }
2038         spin_unlock(&cli->cl_loi_list_lock);
2039 }
2040 EXPORT_SYMBOL(obd_put_request_slot);
2041
2042 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2043 {
2044         return cli->cl_max_rpcs_in_flight;
2045 }
2046 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2047
2048 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2049 {
2050         struct obd_request_slot_waiter *orsw;
2051         __u32                           old;
2052         int                             diff;
2053         int                             i;
2054         char                            *typ_name;
2055         int                             rc;
2056
2057         if (max > OBD_MAX_RIF_MAX || max < 1)
2058                 return -ERANGE;
2059
2060         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2061         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2062                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2063                  * strictly lower that max_rpcs_in_flight */
2064                 if (max < 2) {
2065                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2066                                "because it must be higher than "
2067                                "max_mod_rpcs_in_flight value",
2068                                cli->cl_import->imp_obd->obd_name);
2069                         return -ERANGE;
2070                 }
2071                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2072                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2073                         if (rc != 0)
2074                                 return rc;
2075                 }
2076         }
2077
2078         spin_lock(&cli->cl_loi_list_lock);
2079         old = cli->cl_max_rpcs_in_flight;
2080         cli->cl_max_rpcs_in_flight = max;
2081         diff = max - old;
2082
2083         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2084         for (i = 0; i < diff; i++) {
2085                 if (list_empty(&cli->cl_loi_read_list))
2086                         break;
2087
2088                 orsw = list_entry(cli->cl_loi_read_list.next,
2089                                   struct obd_request_slot_waiter, orsw_entry);
2090                 list_del_init(&orsw->orsw_entry);
2091                 cli->cl_r_in_flight++;
2092                 wake_up(&orsw->orsw_waitq);
2093         }
2094         spin_unlock(&cli->cl_loi_list_lock);
2095
2096         return 0;
2097 }
2098 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2099
2100 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2101 {
2102         return cli->cl_max_mod_rpcs_in_flight;
2103 }
2104 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2105
2106 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2107 {
2108         struct obd_connect_data *ocd;
2109         __u16 maxmodrpcs;
2110         __u16 prev;
2111
2112         if (max > OBD_MAX_RIF_MAX || max < 1)
2113                 return -ERANGE;
2114
2115         /* cannot exceed or equal max_rpcs_in_flight */
2116         if (max >= cli->cl_max_rpcs_in_flight) {
2117                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2118                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2119                        cli->cl_import->imp_obd->obd_name,
2120                        max, cli->cl_max_rpcs_in_flight);
2121                 return -ERANGE;
2122         }
2123
2124         /* cannot exceed max modify RPCs in flight supported by the server */
2125         ocd = &cli->cl_import->imp_connect_data;
2126         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2127                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2128         else
2129                 maxmodrpcs = 1;
2130         if (max > maxmodrpcs) {
2131                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2132                        "higher than max_mod_rpcs_per_client value (%hu) "
2133                        "returned by the server at connection\n",
2134                        cli->cl_import->imp_obd->obd_name,
2135                        max, maxmodrpcs);
2136                 return -ERANGE;
2137         }
2138
2139         spin_lock(&cli->cl_mod_rpcs_lock);
2140
2141         prev = cli->cl_max_mod_rpcs_in_flight;
2142         cli->cl_max_mod_rpcs_in_flight = max;
2143
2144         /* wakeup waiters if limit has been increased */
2145         if (cli->cl_max_mod_rpcs_in_flight > prev)
2146                 wake_up(&cli->cl_mod_rpcs_waitq);
2147
2148         spin_unlock(&cli->cl_mod_rpcs_lock);
2149
2150         return 0;
2151 }
2152 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2153
2154
2155 #define pct(a, b) (b ? a * 100 / b : 0)
2156 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2157                                struct seq_file *seq)
2158 {
2159         struct timeval now;
2160         unsigned long mod_tot = 0, mod_cum;
2161         int i;
2162
2163         do_gettimeofday(&now);
2164
2165         spin_lock(&cli->cl_mod_rpcs_lock);
2166
2167         seq_printf(seq, "snapshot_time:         %lu.%lu (secs.usecs)\n",
2168                    now.tv_sec, now.tv_usec);
2169         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2170                    cli->cl_mod_rpcs_in_flight);
2171
2172         seq_printf(seq, "\n\t\t\tmodify\n");
2173         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2174
2175         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2176
2177         mod_cum = 0;
2178         for (i = 0; i < OBD_HIST_MAX; i++) {
2179                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2180                 mod_cum += mod;
2181                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2182                                  i, mod, pct(mod, mod_tot),
2183                                  pct(mod_cum, mod_tot));
2184                 if (mod_cum == mod_tot)
2185                         break;
2186         }
2187
2188         spin_unlock(&cli->cl_mod_rpcs_lock);
2189
2190         return 0;
2191 }
2192 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2193 #undef pct
2194
2195
2196 /* The number of modify RPCs sent in parallel is limited
2197  * because the server has a finite number of slots per client to
2198  * store request result and ensure reply reconstruction when needed.
2199  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2200  * that takes into account server limit and cl_max_rpcs_in_flight
2201  * value.
2202  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2203  * one close request is allowed above the maximum.
2204  */
2205 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2206                                                  bool close_req)
2207 {
2208         bool avail;
2209
2210         /* A slot is available if
2211          * - number of modify RPCs in flight is less than the max
2212          * - it's a close RPC and no other close request is in flight
2213          */
2214         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2215                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2216
2217         return avail;
2218 }
2219
2220 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2221                                          bool close_req)
2222 {
2223         bool avail;
2224
2225         spin_lock(&cli->cl_mod_rpcs_lock);
2226         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2227         spin_unlock(&cli->cl_mod_rpcs_lock);
2228         return avail;
2229 }
2230
2231 /* Get a modify RPC slot from the obd client @cli according
2232  * to the kind of operation @opc that is going to be sent
2233  * and the intent @it of the operation if it applies.
2234  * If the maximum number of modify RPCs in flight is reached
2235  * the thread is put to sleep.
2236  * Returns the tag to be set in the request message. Tag 0
2237  * is reserved for non-modifying requests.
2238  */
2239 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2240                            struct lookup_intent *it)
2241 {
2242         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2243         bool                    close_req = false;
2244         __u16                   i, max;
2245
2246         /* read-only metadata RPCs don't consume a slot on MDT
2247          * for reply reconstruction
2248          */
2249         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2250                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2251                 return 0;
2252
2253         if (opc == MDS_CLOSE)
2254                 close_req = true;
2255
2256         do {
2257                 spin_lock(&cli->cl_mod_rpcs_lock);
2258                 max = cli->cl_max_mod_rpcs_in_flight;
2259                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2260                         /* there is a slot available */
2261                         cli->cl_mod_rpcs_in_flight++;
2262                         if (close_req)
2263                                 cli->cl_close_rpcs_in_flight++;
2264                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2265                                          cli->cl_mod_rpcs_in_flight);
2266                         /* find a free tag */
2267                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2268                                                 max + 1);
2269                         LASSERT(i < OBD_MAX_RIF_MAX);
2270                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2271                         spin_unlock(&cli->cl_mod_rpcs_lock);
2272                         /* tag 0 is reserved for non-modify RPCs */
2273                         return i + 1;
2274                 }
2275                 spin_unlock(&cli->cl_mod_rpcs_lock);
2276
2277                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2278                        "opc %u, max %hu\n",
2279                        cli->cl_import->imp_obd->obd_name, opc, max);
2280
2281                 l_wait_event(cli->cl_mod_rpcs_waitq,
2282                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2283         } while (true);
2284 }
2285 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2286
2287 /* Put a modify RPC slot from the obd client @cli according
2288  * to the kind of operation @opc that has been sent and the
2289  * intent @it of the operation if it applies.
2290  */
2291 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2292                           struct lookup_intent *it, __u16 tag)
2293 {
2294         bool                    close_req = false;
2295
2296         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2297                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2298                 return;
2299
2300         if (opc == MDS_CLOSE)
2301                 close_req = true;
2302
2303         spin_lock(&cli->cl_mod_rpcs_lock);
2304         cli->cl_mod_rpcs_in_flight--;
2305         if (close_req)
2306                 cli->cl_close_rpcs_in_flight--;
2307         /* release the tag in the bitmap */
2308         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2309         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2310         spin_unlock(&cli->cl_mod_rpcs_lock);
2311         wake_up(&cli->cl_mod_rpcs_waitq);
2312 }
2313 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2314