Whamcloud - gitweb
LU-7988 hsm: run HSM coordinator once per second at most
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lprocfs_status.h>
44 #include <lustre_disk.h>
45 #include <lustre_kernelcomm.h>
46
47 static DEFINE_SPINLOCK(obd_types_lock);
48 static LIST_HEAD(obd_types);
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 static struct kmem_cache *import_cachep;
56
57 static LIST_HEAD(obd_zombie_imports);
58 static LIST_HEAD(obd_zombie_exports);
59 static DEFINE_SPINLOCK(obd_zombie_impexp_lock);
60
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks, int debug_level);
66
67 static LIST_HEAD(obd_stale_exports);
68 static DEFINE_SPINLOCK(obd_stale_export_lock);
69 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
70
71 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
72 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
73
74 /*
75  * support functions: we could use inter-module communication, but this
76  * is more portable to other OS's
77  */
78 static struct obd_device *obd_device_alloc(void)
79 {
80         struct obd_device *obd;
81
82         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
83         if (obd != NULL) {
84                 obd->obd_magic = OBD_DEVICE_MAGIC;
85         }
86         return obd;
87 }
88
89 static void obd_device_free(struct obd_device *obd)
90 {
91         LASSERT(obd != NULL);
92         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
93                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
94         if (obd->obd_namespace != NULL) {
95                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
96                        obd, obd->obd_namespace, obd->obd_force);
97                 LBUG();
98         }
99         lu_ref_fini(&obd->obd_reference);
100         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
101 }
102
103 struct obd_type *class_search_type(const char *name)
104 {
105         struct list_head *tmp;
106         struct obd_type *type;
107
108         spin_lock(&obd_types_lock);
109         list_for_each(tmp, &obd_types) {
110                 type = list_entry(tmp, struct obd_type, typ_chain);
111                 if (strcmp(type->typ_name, name) == 0) {
112                         spin_unlock(&obd_types_lock);
113                         return type;
114                 }
115         }
116         spin_unlock(&obd_types_lock);
117         return NULL;
118 }
119 EXPORT_SYMBOL(class_search_type);
120
121 struct obd_type *class_get_type(const char *name)
122 {
123         struct obd_type *type = class_search_type(name);
124
125 #ifdef HAVE_MODULE_LOADING_SUPPORT
126         if (!type) {
127                 const char *modname = name;
128
129                 if (strcmp(modname, "obdfilter") == 0)
130                         modname = "ofd";
131
132                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
133                         modname = LUSTRE_OSP_NAME;
134
135                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
136                         modname = LUSTRE_MDT_NAME;
137
138                 if (!request_module("%s", modname)) {
139                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
140                         type = class_search_type(name);
141                 } else {
142                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
143                                            modname);
144                 }
145         }
146 #endif
147         if (type) {
148                 spin_lock(&type->obd_type_lock);
149                 type->typ_refcnt++;
150                 try_module_get(type->typ_dt_ops->o_owner);
151                 spin_unlock(&type->obd_type_lock);
152         }
153         return type;
154 }
155
156 void class_put_type(struct obd_type *type)
157 {
158         LASSERT(type);
159         spin_lock(&type->obd_type_lock);
160         type->typ_refcnt--;
161         module_put(type->typ_dt_ops->o_owner);
162         spin_unlock(&type->obd_type_lock);
163 }
164
165 #define CLASS_MAX_NAME 1024
166
167 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
168                         bool enable_proc, struct lprocfs_vars *vars,
169                         const char *name, struct lu_device_type *ldt)
170 {
171         struct obd_type *type;
172         int rc = 0;
173         ENTRY;
174
175         /* sanity check */
176         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
177
178         if (class_search_type(name)) {
179                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
180                 RETURN(-EEXIST);
181         }
182
183         rc = -ENOMEM;
184         OBD_ALLOC(type, sizeof(*type));
185         if (type == NULL)
186                 RETURN(rc);
187
188         OBD_ALLOC_PTR(type->typ_dt_ops);
189         OBD_ALLOC_PTR(type->typ_md_ops);
190         OBD_ALLOC(type->typ_name, strlen(name) + 1);
191
192         if (type->typ_dt_ops == NULL ||
193             type->typ_md_ops == NULL ||
194             type->typ_name == NULL)
195                 GOTO (failed, rc);
196
197         *(type->typ_dt_ops) = *dt_ops;
198         /* md_ops is optional */
199         if (md_ops)
200                 *(type->typ_md_ops) = *md_ops;
201         strcpy(type->typ_name, name);
202         spin_lock_init(&type->obd_type_lock);
203
204 #ifdef CONFIG_PROC_FS
205         if (enable_proc) {
206                 type->typ_procroot = lprocfs_register(type->typ_name,
207                                                       proc_lustre_root,
208                                                       vars, type);
209                 if (IS_ERR(type->typ_procroot)) {
210                         rc = PTR_ERR(type->typ_procroot);
211                         type->typ_procroot = NULL;
212                         GOTO(failed, rc);
213                 }
214         }
215 #endif
216         type->typ_kobj = kobject_create_and_add(type->typ_name, lustre_kobj);
217         if (!type->typ_kobj) {
218                 rc = -ENOMEM;
219                 GOTO(failed, rc);
220         }
221
222         if (ldt != NULL) {
223                 type->typ_lu = ldt;
224                 rc = lu_device_type_init(ldt);
225                 if (rc != 0)
226                         GOTO (failed, rc);
227         }
228
229         spin_lock(&obd_types_lock);
230         list_add(&type->typ_chain, &obd_types);
231         spin_unlock(&obd_types_lock);
232
233         RETURN (0);
234
235 failed:
236         if (type->typ_kobj)
237                 kobject_put(type->typ_kobj);
238         if (type->typ_name != NULL) {
239 #ifdef CONFIG_PROC_FS
240                 if (type->typ_procroot != NULL)
241                         remove_proc_subtree(type->typ_name, proc_lustre_root);
242 #endif
243                 OBD_FREE(type->typ_name, strlen(name) + 1);
244         }
245         if (type->typ_md_ops != NULL)
246                 OBD_FREE_PTR(type->typ_md_ops);
247         if (type->typ_dt_ops != NULL)
248                 OBD_FREE_PTR(type->typ_dt_ops);
249         OBD_FREE(type, sizeof(*type));
250         RETURN(rc);
251 }
252 EXPORT_SYMBOL(class_register_type);
253
254 int class_unregister_type(const char *name)
255 {
256         struct obd_type *type = class_search_type(name);
257         ENTRY;
258
259         if (!type) {
260                 CERROR("unknown obd type\n");
261                 RETURN(-EINVAL);
262         }
263
264         if (type->typ_refcnt) {
265                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
266                 /* This is a bad situation, let's make the best of it */
267                 /* Remove ops, but leave the name for debugging */
268                 OBD_FREE_PTR(type->typ_dt_ops);
269                 OBD_FREE_PTR(type->typ_md_ops);
270                 RETURN(-EBUSY);
271         }
272
273         if (type->typ_kobj)
274                 kobject_put(type->typ_kobj);
275
276         /* we do not use type->typ_procroot as for compatibility purposes
277          * other modules can share names (i.e. lod can use lov entry). so
278          * we can't reference pointer as it can get invalided when another
279          * module removes the entry */
280 #ifdef CONFIG_PROC_FS
281         if (type->typ_procroot != NULL)
282                 remove_proc_subtree(type->typ_name, proc_lustre_root);
283         if (type->typ_procsym != NULL)
284                 lprocfs_remove(&type->typ_procsym);
285 #endif
286         if (type->typ_lu)
287                 lu_device_type_fini(type->typ_lu);
288
289         spin_lock(&obd_types_lock);
290         list_del(&type->typ_chain);
291         spin_unlock(&obd_types_lock);
292         OBD_FREE(type->typ_name, strlen(name) + 1);
293         if (type->typ_dt_ops != NULL)
294                 OBD_FREE_PTR(type->typ_dt_ops);
295         if (type->typ_md_ops != NULL)
296                 OBD_FREE_PTR(type->typ_md_ops);
297         OBD_FREE(type, sizeof(*type));
298         RETURN(0);
299 } /* class_unregister_type */
300 EXPORT_SYMBOL(class_unregister_type);
301
302 /**
303  * Create a new obd device.
304  *
305  * Find an empty slot in ::obd_devs[], create a new obd device in it.
306  *
307  * \param[in] type_name obd device type string.
308  * \param[in] name      obd device name.
309  *
310  * \retval NULL if create fails, otherwise return the obd device
311  *         pointer created.
312  */
313 struct obd_device *class_newdev(const char *type_name, const char *name)
314 {
315         struct obd_device *result = NULL;
316         struct obd_device *newdev;
317         struct obd_type *type = NULL;
318         int i;
319         int new_obd_minor = 0;
320         ENTRY;
321
322         if (strlen(name) >= MAX_OBD_NAME) {
323                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
324                 RETURN(ERR_PTR(-EINVAL));
325         }
326
327         type = class_get_type(type_name);
328         if (type == NULL){
329                 CERROR("OBD: unknown type: %s\n", type_name);
330                 RETURN(ERR_PTR(-ENODEV));
331         }
332
333         newdev = obd_device_alloc();
334         if (newdev == NULL)
335                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
336
337         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
338
339         write_lock(&obd_dev_lock);
340         for (i = 0; i < class_devno_max(); i++) {
341                 struct obd_device *obd = class_num2obd(i);
342
343                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
344                         CERROR("Device %s already exists at %d, won't add\n",
345                                name, i);
346                         if (result) {
347                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
348                                          "%p obd_magic %08x != %08x\n", result,
349                                          result->obd_magic, OBD_DEVICE_MAGIC);
350                                 LASSERTF(result->obd_minor == new_obd_minor,
351                                          "%p obd_minor %d != %d\n", result,
352                                          result->obd_minor, new_obd_minor);
353
354                                 obd_devs[result->obd_minor] = NULL;
355                                 result->obd_name[0]='\0';
356                          }
357                         result = ERR_PTR(-EEXIST);
358                         break;
359                 }
360                 if (!result && !obd) {
361                         result = newdev;
362                         result->obd_minor = i;
363                         new_obd_minor = i;
364                         result->obd_type = type;
365                         strncpy(result->obd_name, name,
366                                 sizeof(result->obd_name) - 1);
367                         obd_devs[i] = result;
368                 }
369         }
370         write_unlock(&obd_dev_lock);
371
372         if (result == NULL && i >= class_devno_max()) {
373                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
374                        class_devno_max());
375                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
376         }
377
378         if (IS_ERR(result))
379                 GOTO(out, result);
380
381         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
382                result->obd_name, result);
383
384         RETURN(result);
385 out:
386         obd_device_free(newdev);
387 out_type:
388         class_put_type(type);
389         return result;
390 }
391
392 void class_release_dev(struct obd_device *obd)
393 {
394         struct obd_type *obd_type = obd->obd_type;
395
396         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
397                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
398         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
399                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
400         LASSERT(obd_type != NULL);
401
402         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
403                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
404
405         write_lock(&obd_dev_lock);
406         obd_devs[obd->obd_minor] = NULL;
407         write_unlock(&obd_dev_lock);
408         obd_device_free(obd);
409
410         class_put_type(obd_type);
411 }
412
413 int class_name2dev(const char *name)
414 {
415         int i;
416
417         if (!name)
418                 return -1;
419
420         read_lock(&obd_dev_lock);
421         for (i = 0; i < class_devno_max(); i++) {
422                 struct obd_device *obd = class_num2obd(i);
423
424                 if (obd && strcmp(name, obd->obd_name) == 0) {
425                         /* Make sure we finished attaching before we give
426                            out any references */
427                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
428                         if (obd->obd_attached) {
429                                 read_unlock(&obd_dev_lock);
430                                 return i;
431                         }
432                         break;
433                 }
434         }
435         read_unlock(&obd_dev_lock);
436
437         return -1;
438 }
439
440 struct obd_device *class_name2obd(const char *name)
441 {
442         int dev = class_name2dev(name);
443
444         if (dev < 0 || dev > class_devno_max())
445                 return NULL;
446         return class_num2obd(dev);
447 }
448 EXPORT_SYMBOL(class_name2obd);
449
450 int class_uuid2dev(struct obd_uuid *uuid)
451 {
452         int i;
453
454         read_lock(&obd_dev_lock);
455         for (i = 0; i < class_devno_max(); i++) {
456                 struct obd_device *obd = class_num2obd(i);
457
458                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
459                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
460                         read_unlock(&obd_dev_lock);
461                         return i;
462                 }
463         }
464         read_unlock(&obd_dev_lock);
465
466         return -1;
467 }
468
469 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
470 {
471         int dev = class_uuid2dev(uuid);
472         if (dev < 0)
473                 return NULL;
474         return class_num2obd(dev);
475 }
476 EXPORT_SYMBOL(class_uuid2obd);
477
478 /**
479  * Get obd device from ::obd_devs[]
480  *
481  * \param num [in] array index
482  *
483  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
484  *         otherwise return the obd device there.
485  */
486 struct obd_device *class_num2obd(int num)
487 {
488         struct obd_device *obd = NULL;
489
490         if (num < class_devno_max()) {
491                 obd = obd_devs[num];
492                 if (obd == NULL)
493                         return NULL;
494
495                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
496                          "%p obd_magic %08x != %08x\n",
497                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
498                 LASSERTF(obd->obd_minor == num,
499                          "%p obd_minor %0d != %0d\n",
500                          obd, obd->obd_minor, num);
501         }
502
503         return obd;
504 }
505
506 /**
507  * Get obd devices count. Device in any
508  *    state are counted
509  * \retval obd device count
510  */
511 int get_devices_count(void)
512 {
513         int index, max_index = class_devno_max(), dev_count = 0;
514
515         read_lock(&obd_dev_lock);
516         for (index = 0; index <= max_index; index++) {
517                 struct obd_device *obd = class_num2obd(index);
518                 if (obd != NULL)
519                         dev_count++;
520         }
521         read_unlock(&obd_dev_lock);
522
523         return dev_count;
524 }
525 EXPORT_SYMBOL(get_devices_count);
526
527 void class_obd_list(void)
528 {
529         char *status;
530         int i;
531
532         read_lock(&obd_dev_lock);
533         for (i = 0; i < class_devno_max(); i++) {
534                 struct obd_device *obd = class_num2obd(i);
535
536                 if (obd == NULL)
537                         continue;
538                 if (obd->obd_stopping)
539                         status = "ST";
540                 else if (obd->obd_set_up)
541                         status = "UP";
542                 else if (obd->obd_attached)
543                         status = "AT";
544                 else
545                         status = "--";
546                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
547                          i, status, obd->obd_type->typ_name,
548                          obd->obd_name, obd->obd_uuid.uuid,
549                          atomic_read(&obd->obd_refcount));
550         }
551         read_unlock(&obd_dev_lock);
552         return;
553 }
554
555 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
556    specified, then only the client with that uuid is returned,
557    otherwise any client connected to the tgt is returned. */
558 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
559                                           const char * typ_name,
560                                           struct obd_uuid *grp_uuid)
561 {
562         int i;
563
564         read_lock(&obd_dev_lock);
565         for (i = 0; i < class_devno_max(); i++) {
566                 struct obd_device *obd = class_num2obd(i);
567
568                 if (obd == NULL)
569                         continue;
570                 if ((strncmp(obd->obd_type->typ_name, typ_name,
571                              strlen(typ_name)) == 0)) {
572                         if (obd_uuid_equals(tgt_uuid,
573                                             &obd->u.cli.cl_target_uuid) &&
574                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
575                                                          &obd->obd_uuid) : 1)) {
576                                 read_unlock(&obd_dev_lock);
577                                 return obd;
578                         }
579                 }
580         }
581         read_unlock(&obd_dev_lock);
582
583         return NULL;
584 }
585 EXPORT_SYMBOL(class_find_client_obd);
586
587 /* Iterate the obd_device list looking devices have grp_uuid. Start
588    searching at *next, and if a device is found, the next index to look
589    at is saved in *next. If next is NULL, then the first matching device
590    will always be returned. */
591 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
592 {
593         int i;
594
595         if (next == NULL)
596                 i = 0;
597         else if (*next >= 0 && *next < class_devno_max())
598                 i = *next;
599         else
600                 return NULL;
601
602         read_lock(&obd_dev_lock);
603         for (; i < class_devno_max(); i++) {
604                 struct obd_device *obd = class_num2obd(i);
605
606                 if (obd == NULL)
607                         continue;
608                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
609                         if (next != NULL)
610                                 *next = i+1;
611                         read_unlock(&obd_dev_lock);
612                         return obd;
613                 }
614         }
615         read_unlock(&obd_dev_lock);
616
617         return NULL;
618 }
619 EXPORT_SYMBOL(class_devices_in_group);
620
621 /**
622  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
623  * adjust sptlrpc settings accordingly.
624  */
625 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
626 {
627         struct obd_device  *obd;
628         const char         *type;
629         int                 i, rc = 0, rc2;
630
631         LASSERT(namelen > 0);
632
633         read_lock(&obd_dev_lock);
634         for (i = 0; i < class_devno_max(); i++) {
635                 obd = class_num2obd(i);
636
637                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
638                         continue;
639
640                 /* only notify mdc, osc, osp, lwp, mdt, ost
641                  * because only these have a -sptlrpc llog */
642                 type = obd->obd_type->typ_name;
643                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
644                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
645                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
646                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
647                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
648                     strcmp(type, LUSTRE_OST_NAME) != 0)
649                         continue;
650
651                 if (strncmp(obd->obd_name, fsname, namelen))
652                         continue;
653
654                 class_incref(obd, __FUNCTION__, obd);
655                 read_unlock(&obd_dev_lock);
656                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
657                                          sizeof(KEY_SPTLRPC_CONF),
658                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
659                 rc = rc ? rc : rc2;
660                 class_decref(obd, __FUNCTION__, obd);
661                 read_lock(&obd_dev_lock);
662         }
663         read_unlock(&obd_dev_lock);
664         return rc;
665 }
666 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
667
668 void obd_cleanup_caches(void)
669 {
670         ENTRY;
671         if (obd_device_cachep) {
672                 kmem_cache_destroy(obd_device_cachep);
673                 obd_device_cachep = NULL;
674         }
675         if (obdo_cachep) {
676                 kmem_cache_destroy(obdo_cachep);
677                 obdo_cachep = NULL;
678         }
679         if (import_cachep) {
680                 kmem_cache_destroy(import_cachep);
681                 import_cachep = NULL;
682         }
683
684         EXIT;
685 }
686
687 int obd_init_caches(void)
688 {
689         int rc;
690         ENTRY;
691
692         LASSERT(obd_device_cachep == NULL);
693         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
694                                               sizeof(struct obd_device),
695                                               0, 0, NULL);
696         if (!obd_device_cachep)
697                 GOTO(out, rc = -ENOMEM);
698
699         LASSERT(obdo_cachep == NULL);
700         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
701                                         0, 0, NULL);
702         if (!obdo_cachep)
703                 GOTO(out, rc = -ENOMEM);
704
705         LASSERT(import_cachep == NULL);
706         import_cachep = kmem_cache_create("ll_import_cache",
707                                           sizeof(struct obd_import),
708                                           0, 0, NULL);
709         if (!import_cachep)
710                 GOTO(out, rc = -ENOMEM);
711
712         RETURN(0);
713 out:
714         obd_cleanup_caches();
715         RETURN(rc);
716 }
717
718 /* map connection to client */
719 struct obd_export *class_conn2export(struct lustre_handle *conn)
720 {
721         struct obd_export *export;
722         ENTRY;
723
724         if (!conn) {
725                 CDEBUG(D_CACHE, "looking for null handle\n");
726                 RETURN(NULL);
727         }
728
729         if (conn->cookie == -1) {  /* this means assign a new connection */
730                 CDEBUG(D_CACHE, "want a new connection\n");
731                 RETURN(NULL);
732         }
733
734         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
735         export = class_handle2object(conn->cookie, NULL);
736         RETURN(export);
737 }
738 EXPORT_SYMBOL(class_conn2export);
739
740 struct obd_device *class_exp2obd(struct obd_export *exp)
741 {
742         if (exp)
743                 return exp->exp_obd;
744         return NULL;
745 }
746 EXPORT_SYMBOL(class_exp2obd);
747
748 struct obd_device *class_conn2obd(struct lustre_handle *conn)
749 {
750         struct obd_export *export;
751         export = class_conn2export(conn);
752         if (export) {
753                 struct obd_device *obd = export->exp_obd;
754                 class_export_put(export);
755                 return obd;
756         }
757         return NULL;
758 }
759
760 struct obd_import *class_exp2cliimp(struct obd_export *exp)
761 {
762         struct obd_device *obd = exp->exp_obd;
763         if (obd == NULL)
764                 return NULL;
765         return obd->u.cli.cl_import;
766 }
767 EXPORT_SYMBOL(class_exp2cliimp);
768
769 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
770 {
771         struct obd_device *obd = class_conn2obd(conn);
772         if (obd == NULL)
773                 return NULL;
774         return obd->u.cli.cl_import;
775 }
776
777 /* Export management functions */
778 static void class_export_destroy(struct obd_export *exp)
779 {
780         struct obd_device *obd = exp->exp_obd;
781         ENTRY;
782
783         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
784         LASSERT(obd != NULL);
785
786         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
787                exp->exp_client_uuid.uuid, obd->obd_name);
788
789         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
790         if (exp->exp_connection)
791                 ptlrpc_put_connection_superhack(exp->exp_connection);
792
793         LASSERT(list_empty(&exp->exp_outstanding_replies));
794         LASSERT(list_empty(&exp->exp_uncommitted_replies));
795         LASSERT(list_empty(&exp->exp_req_replay_queue));
796         LASSERT(list_empty(&exp->exp_hp_rpcs));
797         obd_destroy_export(exp);
798         class_decref(obd, "export", exp);
799
800         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
801         EXIT;
802 }
803
804 static void export_handle_addref(void *export)
805 {
806         class_export_get(export);
807 }
808
809 static struct portals_handle_ops export_handle_ops = {
810         .hop_addref = export_handle_addref,
811         .hop_free   = NULL,
812 };
813
814 struct obd_export *class_export_get(struct obd_export *exp)
815 {
816         atomic_inc(&exp->exp_refcount);
817         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
818                atomic_read(&exp->exp_refcount));
819         return exp;
820 }
821 EXPORT_SYMBOL(class_export_get);
822
823 void class_export_put(struct obd_export *exp)
824 {
825         LASSERT(exp != NULL);
826         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
827         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
828                atomic_read(&exp->exp_refcount) - 1);
829
830         if (atomic_dec_and_test(&exp->exp_refcount)) {
831                 LASSERT(!list_empty(&exp->exp_obd_chain));
832                 LASSERT(list_empty(&exp->exp_stale_list));
833                 CDEBUG(D_IOCTL, "final put %p/%s\n",
834                        exp, exp->exp_client_uuid.uuid);
835
836                 /* release nid stat refererence */
837                 lprocfs_exp_cleanup(exp);
838
839                 obd_zombie_export_add(exp);
840         }
841 }
842 EXPORT_SYMBOL(class_export_put);
843
844 /* Creates a new export, adds it to the hash table, and returns a
845  * pointer to it. The refcount is 2: one for the hash reference, and
846  * one for the pointer returned by this function. */
847 struct obd_export *class_new_export(struct obd_device *obd,
848                                     struct obd_uuid *cluuid)
849 {
850         struct obd_export *export;
851         struct cfs_hash *hash = NULL;
852         int rc = 0;
853         ENTRY;
854
855         OBD_ALLOC_PTR(export);
856         if (!export)
857                 return ERR_PTR(-ENOMEM);
858
859         export->exp_conn_cnt = 0;
860         export->exp_lock_hash = NULL;
861         export->exp_flock_hash = NULL;
862         atomic_set(&export->exp_refcount, 2);
863         atomic_set(&export->exp_rpc_count, 0);
864         atomic_set(&export->exp_cb_count, 0);
865         atomic_set(&export->exp_locks_count, 0);
866 #if LUSTRE_TRACKS_LOCK_EXP_REFS
867         INIT_LIST_HEAD(&export->exp_locks_list);
868         spin_lock_init(&export->exp_locks_list_guard);
869 #endif
870         atomic_set(&export->exp_replay_count, 0);
871         export->exp_obd = obd;
872         INIT_LIST_HEAD(&export->exp_outstanding_replies);
873         spin_lock_init(&export->exp_uncommitted_replies_lock);
874         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
875         INIT_LIST_HEAD(&export->exp_req_replay_queue);
876         INIT_LIST_HEAD(&export->exp_handle.h_link);
877         INIT_LIST_HEAD(&export->exp_hp_rpcs);
878         INIT_LIST_HEAD(&export->exp_reg_rpcs);
879         class_handle_hash(&export->exp_handle, &export_handle_ops);
880         export->exp_last_request_time = cfs_time_current_sec();
881         spin_lock_init(&export->exp_lock);
882         spin_lock_init(&export->exp_rpc_lock);
883         INIT_HLIST_NODE(&export->exp_uuid_hash);
884         INIT_HLIST_NODE(&export->exp_nid_hash);
885         INIT_HLIST_NODE(&export->exp_gen_hash);
886         spin_lock_init(&export->exp_bl_list_lock);
887         INIT_LIST_HEAD(&export->exp_bl_list);
888         INIT_LIST_HEAD(&export->exp_stale_list);
889
890         export->exp_sp_peer = LUSTRE_SP_ANY;
891         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
892         export->exp_client_uuid = *cluuid;
893         obd_init_export(export);
894
895         spin_lock(&obd->obd_dev_lock);
896         /* shouldn't happen, but might race */
897         if (obd->obd_stopping)
898                 GOTO(exit_unlock, rc = -ENODEV);
899
900         hash = cfs_hash_getref(obd->obd_uuid_hash);
901         if (hash == NULL)
902                 GOTO(exit_unlock, rc = -ENODEV);
903         spin_unlock(&obd->obd_dev_lock);
904
905         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
906                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
907                 if (rc != 0) {
908                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
909                                       obd->obd_name, cluuid->uuid, rc);
910                         GOTO(exit_err, rc = -EALREADY);
911                 }
912         }
913
914         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
915         spin_lock(&obd->obd_dev_lock);
916         if (obd->obd_stopping) {
917                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
918                 GOTO(exit_unlock, rc = -ENODEV);
919         }
920
921         class_incref(obd, "export", export);
922         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
923         list_add_tail(&export->exp_obd_chain_timed,
924                       &export->exp_obd->obd_exports_timed);
925         export->exp_obd->obd_num_exports++;
926         spin_unlock(&obd->obd_dev_lock);
927         cfs_hash_putref(hash);
928         RETURN(export);
929
930 exit_unlock:
931         spin_unlock(&obd->obd_dev_lock);
932 exit_err:
933         if (hash)
934                 cfs_hash_putref(hash);
935         class_handle_unhash(&export->exp_handle);
936         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
937         obd_destroy_export(export);
938         OBD_FREE_PTR(export);
939         return ERR_PTR(rc);
940 }
941 EXPORT_SYMBOL(class_new_export);
942
943 void class_unlink_export(struct obd_export *exp)
944 {
945         class_handle_unhash(&exp->exp_handle);
946
947         spin_lock(&exp->exp_obd->obd_dev_lock);
948         /* delete an uuid-export hashitem from hashtables */
949         if (!hlist_unhashed(&exp->exp_uuid_hash))
950                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
951                              &exp->exp_client_uuid,
952                              &exp->exp_uuid_hash);
953
954 #ifdef HAVE_SERVER_SUPPORT
955         if (!hlist_unhashed(&exp->exp_gen_hash)) {
956                 struct tg_export_data   *ted = &exp->exp_target_data;
957                 struct cfs_hash         *hash;
958
959                 /* Because obd_gen_hash will not be released until
960                  * class_cleanup(), so hash should never be NULL here */
961                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
962                 LASSERT(hash != NULL);
963                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
964                              &exp->exp_gen_hash);
965                 cfs_hash_putref(hash);
966         }
967 #endif /* HAVE_SERVER_SUPPORT */
968
969         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
970         list_del_init(&exp->exp_obd_chain_timed);
971         exp->exp_obd->obd_num_exports--;
972         spin_unlock(&exp->exp_obd->obd_dev_lock);
973         atomic_inc(&obd_stale_export_num);
974
975         /* A reference is kept by obd_stale_exports list */
976         obd_stale_export_put(exp);
977 }
978 EXPORT_SYMBOL(class_unlink_export);
979
980 /* Import management functions */
981 static void class_import_destroy(struct obd_import *imp)
982 {
983         ENTRY;
984
985         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
986                 imp->imp_obd->obd_name);
987
988         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
989
990         ptlrpc_put_connection_superhack(imp->imp_connection);
991
992         while (!list_empty(&imp->imp_conn_list)) {
993                 struct obd_import_conn *imp_conn;
994
995                 imp_conn = list_entry(imp->imp_conn_list.next,
996                                       struct obd_import_conn, oic_item);
997                 list_del_init(&imp_conn->oic_item);
998                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
999                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1000         }
1001
1002         LASSERT(imp->imp_sec == NULL);
1003         class_decref(imp->imp_obd, "import", imp);
1004         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1005         EXIT;
1006 }
1007
1008 static void import_handle_addref(void *import)
1009 {
1010         class_import_get(import);
1011 }
1012
1013 static struct portals_handle_ops import_handle_ops = {
1014         .hop_addref = import_handle_addref,
1015         .hop_free   = NULL,
1016 };
1017
1018 struct obd_import *class_import_get(struct obd_import *import)
1019 {
1020         atomic_inc(&import->imp_refcount);
1021         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1022                atomic_read(&import->imp_refcount),
1023                import->imp_obd->obd_name);
1024         return import;
1025 }
1026 EXPORT_SYMBOL(class_import_get);
1027
1028 void class_import_put(struct obd_import *imp)
1029 {
1030         ENTRY;
1031
1032         LASSERT(list_empty(&imp->imp_zombie_chain));
1033         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1034
1035         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1036                atomic_read(&imp->imp_refcount) - 1,
1037                imp->imp_obd->obd_name);
1038
1039         if (atomic_dec_and_test(&imp->imp_refcount)) {
1040                 CDEBUG(D_INFO, "final put import %p\n", imp);
1041                 obd_zombie_import_add(imp);
1042         }
1043
1044         /* catch possible import put race */
1045         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1046         EXIT;
1047 }
1048 EXPORT_SYMBOL(class_import_put);
1049
1050 static void init_imp_at(struct imp_at *at) {
1051         int i;
1052         at_init(&at->iat_net_latency, 0, 0);
1053         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1054                 /* max service estimates are tracked on the server side, so
1055                    don't use the AT history here, just use the last reported
1056                    val. (But keep hist for proc histogram, worst_ever) */
1057                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1058                         AT_FLG_NOHIST);
1059         }
1060 }
1061
1062 struct obd_import *class_new_import(struct obd_device *obd)
1063 {
1064         struct obd_import *imp;
1065         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1066
1067         OBD_ALLOC(imp, sizeof(*imp));
1068         if (imp == NULL)
1069                 return NULL;
1070
1071         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1072         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1073         INIT_LIST_HEAD(&imp->imp_replay_list);
1074         INIT_LIST_HEAD(&imp->imp_sending_list);
1075         INIT_LIST_HEAD(&imp->imp_delayed_list);
1076         INIT_LIST_HEAD(&imp->imp_committed_list);
1077         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1078         imp->imp_known_replied_xid = 0;
1079         imp->imp_replay_cursor = &imp->imp_committed_list;
1080         spin_lock_init(&imp->imp_lock);
1081         imp->imp_last_success_conn = 0;
1082         imp->imp_state = LUSTRE_IMP_NEW;
1083         imp->imp_obd = class_incref(obd, "import", imp);
1084         mutex_init(&imp->imp_sec_mutex);
1085         init_waitqueue_head(&imp->imp_recovery_waitq);
1086
1087         if (curr_pid_ns->child_reaper)
1088                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1089         else
1090                 imp->imp_sec_refpid = 1;
1091
1092         atomic_set(&imp->imp_refcount, 2);
1093         atomic_set(&imp->imp_unregistering, 0);
1094         atomic_set(&imp->imp_inflight, 0);
1095         atomic_set(&imp->imp_replay_inflight, 0);
1096         atomic_set(&imp->imp_inval_count, 0);
1097         INIT_LIST_HEAD(&imp->imp_conn_list);
1098         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1099         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1100         init_imp_at(&imp->imp_at);
1101
1102         /* the default magic is V2, will be used in connect RPC, and
1103          * then adjusted according to the flags in request/reply. */
1104         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1105
1106         return imp;
1107 }
1108 EXPORT_SYMBOL(class_new_import);
1109
1110 void class_destroy_import(struct obd_import *import)
1111 {
1112         LASSERT(import != NULL);
1113         LASSERT(import != LP_POISON);
1114
1115         class_handle_unhash(&import->imp_handle);
1116
1117         spin_lock(&import->imp_lock);
1118         import->imp_generation++;
1119         spin_unlock(&import->imp_lock);
1120         class_import_put(import);
1121 }
1122 EXPORT_SYMBOL(class_destroy_import);
1123
1124 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1125
1126 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1127 {
1128         spin_lock(&exp->exp_locks_list_guard);
1129
1130         LASSERT(lock->l_exp_refs_nr >= 0);
1131
1132         if (lock->l_exp_refs_target != NULL &&
1133             lock->l_exp_refs_target != exp) {
1134                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1135                               exp, lock, lock->l_exp_refs_target);
1136         }
1137         if ((lock->l_exp_refs_nr ++) == 0) {
1138                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1139                 lock->l_exp_refs_target = exp;
1140         }
1141         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1142                lock, exp, lock->l_exp_refs_nr);
1143         spin_unlock(&exp->exp_locks_list_guard);
1144 }
1145 EXPORT_SYMBOL(__class_export_add_lock_ref);
1146
1147 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1148 {
1149         spin_lock(&exp->exp_locks_list_guard);
1150         LASSERT(lock->l_exp_refs_nr > 0);
1151         if (lock->l_exp_refs_target != exp) {
1152                 LCONSOLE_WARN("lock %p, "
1153                               "mismatching export pointers: %p, %p\n",
1154                               lock, lock->l_exp_refs_target, exp);
1155         }
1156         if (-- lock->l_exp_refs_nr == 0) {
1157                 list_del_init(&lock->l_exp_refs_link);
1158                 lock->l_exp_refs_target = NULL;
1159         }
1160         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1161                lock, exp, lock->l_exp_refs_nr);
1162         spin_unlock(&exp->exp_locks_list_guard);
1163 }
1164 EXPORT_SYMBOL(__class_export_del_lock_ref);
1165 #endif
1166
1167 /* A connection defines an export context in which preallocation can
1168    be managed. This releases the export pointer reference, and returns
1169    the export handle, so the export refcount is 1 when this function
1170    returns. */
1171 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1172                   struct obd_uuid *cluuid)
1173 {
1174         struct obd_export *export;
1175         LASSERT(conn != NULL);
1176         LASSERT(obd != NULL);
1177         LASSERT(cluuid != NULL);
1178         ENTRY;
1179
1180         export = class_new_export(obd, cluuid);
1181         if (IS_ERR(export))
1182                 RETURN(PTR_ERR(export));
1183
1184         conn->cookie = export->exp_handle.h_cookie;
1185         class_export_put(export);
1186
1187         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1188                cluuid->uuid, conn->cookie);
1189         RETURN(0);
1190 }
1191 EXPORT_SYMBOL(class_connect);
1192
1193 /* if export is involved in recovery then clean up related things */
1194 static void class_export_recovery_cleanup(struct obd_export *exp)
1195 {
1196         struct obd_device *obd = exp->exp_obd;
1197
1198         spin_lock(&obd->obd_recovery_task_lock);
1199         if (obd->obd_recovering) {
1200                 if (exp->exp_in_recovery) {
1201                         spin_lock(&exp->exp_lock);
1202                         exp->exp_in_recovery = 0;
1203                         spin_unlock(&exp->exp_lock);
1204                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1205                         atomic_dec(&obd->obd_connected_clients);
1206                 }
1207
1208                 /* if called during recovery then should update
1209                  * obd_stale_clients counter,
1210                  * lightweight exports are not counted */
1211                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1212                         exp->exp_obd->obd_stale_clients++;
1213         }
1214         spin_unlock(&obd->obd_recovery_task_lock);
1215
1216         spin_lock(&exp->exp_lock);
1217         /** Cleanup req replay fields */
1218         if (exp->exp_req_replay_needed) {
1219                 exp->exp_req_replay_needed = 0;
1220
1221                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1222                 atomic_dec(&obd->obd_req_replay_clients);
1223         }
1224
1225         /** Cleanup lock replay data */
1226         if (exp->exp_lock_replay_needed) {
1227                 exp->exp_lock_replay_needed = 0;
1228
1229                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1230                 atomic_dec(&obd->obd_lock_replay_clients);
1231         }
1232         spin_unlock(&exp->exp_lock);
1233 }
1234
1235 /* This function removes 1-3 references from the export:
1236  * 1 - for export pointer passed
1237  * and if disconnect really need
1238  * 2 - removing from hash
1239  * 3 - in client_unlink_export
1240  * The export pointer passed to this function can destroyed */
1241 int class_disconnect(struct obd_export *export)
1242 {
1243         int already_disconnected;
1244         ENTRY;
1245
1246         if (export == NULL) {
1247                 CWARN("attempting to free NULL export %p\n", export);
1248                 RETURN(-EINVAL);
1249         }
1250
1251         spin_lock(&export->exp_lock);
1252         already_disconnected = export->exp_disconnected;
1253         export->exp_disconnected = 1;
1254         /*  We hold references of export for uuid hash
1255          *  and nid_hash and export link at least. So
1256          *  it is safe to call cfs_hash_del in there.  */
1257         if (!hlist_unhashed(&export->exp_nid_hash))
1258                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1259                              &export->exp_connection->c_peer.nid,
1260                              &export->exp_nid_hash);
1261         spin_unlock(&export->exp_lock);
1262
1263         /* class_cleanup(), abort_recovery(), and class_fail_export()
1264          * all end up in here, and if any of them race we shouldn't
1265          * call extra class_export_puts(). */
1266         if (already_disconnected) {
1267                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1268                 GOTO(no_disconn, already_disconnected);
1269         }
1270
1271         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1272                export->exp_handle.h_cookie);
1273
1274         class_export_recovery_cleanup(export);
1275         class_unlink_export(export);
1276 no_disconn:
1277         class_export_put(export);
1278         RETURN(0);
1279 }
1280 EXPORT_SYMBOL(class_disconnect);
1281
1282 /* Return non-zero for a fully connected export */
1283 int class_connected_export(struct obd_export *exp)
1284 {
1285         int connected = 0;
1286
1287         if (exp) {
1288                 spin_lock(&exp->exp_lock);
1289                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1290                 spin_unlock(&exp->exp_lock);
1291         }
1292         return connected;
1293 }
1294 EXPORT_SYMBOL(class_connected_export);
1295
1296 static void class_disconnect_export_list(struct list_head *list,
1297                                          enum obd_option flags)
1298 {
1299         int rc;
1300         struct obd_export *exp;
1301         ENTRY;
1302
1303         /* It's possible that an export may disconnect itself, but
1304          * nothing else will be added to this list. */
1305         while (!list_empty(list)) {
1306                 exp = list_entry(list->next, struct obd_export,
1307                                  exp_obd_chain);
1308                 /* need for safe call CDEBUG after obd_disconnect */
1309                 class_export_get(exp);
1310
1311                 spin_lock(&exp->exp_lock);
1312                 exp->exp_flags = flags;
1313                 spin_unlock(&exp->exp_lock);
1314
1315                 if (obd_uuid_equals(&exp->exp_client_uuid,
1316                                     &exp->exp_obd->obd_uuid)) {
1317                         CDEBUG(D_HA,
1318                                "exp %p export uuid == obd uuid, don't discon\n",
1319                                exp);
1320                         /* Need to delete this now so we don't end up pointing
1321                          * to work_list later when this export is cleaned up. */
1322                         list_del_init(&exp->exp_obd_chain);
1323                         class_export_put(exp);
1324                         continue;
1325                 }
1326
1327                 class_export_get(exp);
1328                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1329                        "last request at %ld\n",
1330                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1331                        exp, exp->exp_last_request_time);
1332                 /* release one export reference anyway */
1333                 rc = obd_disconnect(exp);
1334
1335                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1336                        obd_export_nid2str(exp), exp, rc);
1337                 class_export_put(exp);
1338         }
1339         EXIT;
1340 }
1341
1342 void class_disconnect_exports(struct obd_device *obd)
1343 {
1344         struct list_head work_list;
1345         ENTRY;
1346
1347         /* Move all of the exports from obd_exports to a work list, en masse. */
1348         INIT_LIST_HEAD(&work_list);
1349         spin_lock(&obd->obd_dev_lock);
1350         list_splice_init(&obd->obd_exports, &work_list);
1351         list_splice_init(&obd->obd_delayed_exports, &work_list);
1352         spin_unlock(&obd->obd_dev_lock);
1353
1354         if (!list_empty(&work_list)) {
1355                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1356                        "disconnecting them\n", obd->obd_minor, obd);
1357                 class_disconnect_export_list(&work_list,
1358                                              exp_flags_from_obd(obd));
1359         } else
1360                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1361                        obd->obd_minor, obd);
1362         EXIT;
1363 }
1364 EXPORT_SYMBOL(class_disconnect_exports);
1365
1366 /* Remove exports that have not completed recovery.
1367  */
1368 void class_disconnect_stale_exports(struct obd_device *obd,
1369                                     int (*test_export)(struct obd_export *))
1370 {
1371         struct list_head work_list;
1372         struct obd_export *exp, *n;
1373         int evicted = 0;
1374         ENTRY;
1375
1376         INIT_LIST_HEAD(&work_list);
1377         spin_lock(&obd->obd_dev_lock);
1378         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1379                                  exp_obd_chain) {
1380                 /* don't count self-export as client */
1381                 if (obd_uuid_equals(&exp->exp_client_uuid,
1382                                     &exp->exp_obd->obd_uuid))
1383                         continue;
1384
1385                 /* don't evict clients which have no slot in last_rcvd
1386                  * (e.g. lightweight connection) */
1387                 if (exp->exp_target_data.ted_lr_idx == -1)
1388                         continue;
1389
1390                 spin_lock(&exp->exp_lock);
1391                 if (exp->exp_failed || test_export(exp)) {
1392                         spin_unlock(&exp->exp_lock);
1393                         continue;
1394                 }
1395                 exp->exp_failed = 1;
1396                 spin_unlock(&exp->exp_lock);
1397
1398                 list_move(&exp->exp_obd_chain, &work_list);
1399                 evicted++;
1400                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1401                        obd->obd_name, exp->exp_client_uuid.uuid,
1402                        exp->exp_connection == NULL ? "<unknown>" :
1403                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1404                 print_export_data(exp, "EVICTING", 0, D_HA);
1405         }
1406         spin_unlock(&obd->obd_dev_lock);
1407
1408         if (evicted)
1409                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1410                               obd->obd_name, evicted);
1411
1412         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1413                                                  OBD_OPT_ABORT_RECOV);
1414         EXIT;
1415 }
1416 EXPORT_SYMBOL(class_disconnect_stale_exports);
1417
1418 void class_fail_export(struct obd_export *exp)
1419 {
1420         int rc, already_failed;
1421
1422         spin_lock(&exp->exp_lock);
1423         already_failed = exp->exp_failed;
1424         exp->exp_failed = 1;
1425         spin_unlock(&exp->exp_lock);
1426
1427         if (already_failed) {
1428                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1429                        exp, exp->exp_client_uuid.uuid);
1430                 return;
1431         }
1432
1433         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1434                exp, exp->exp_client_uuid.uuid);
1435
1436         if (obd_dump_on_timeout)
1437                 libcfs_debug_dumplog();
1438
1439         /* need for safe call CDEBUG after obd_disconnect */
1440         class_export_get(exp);
1441
1442         /* Most callers into obd_disconnect are removing their own reference
1443          * (request, for example) in addition to the one from the hash table.
1444          * We don't have such a reference here, so make one. */
1445         class_export_get(exp);
1446         rc = obd_disconnect(exp);
1447         if (rc)
1448                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1449         else
1450                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1451                        exp, exp->exp_client_uuid.uuid);
1452         class_export_put(exp);
1453 }
1454 EXPORT_SYMBOL(class_fail_export);
1455
1456 char *obd_export_nid2str(struct obd_export *exp)
1457 {
1458         if (exp->exp_connection != NULL)
1459                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1460
1461         return "(no nid)";
1462 }
1463 EXPORT_SYMBOL(obd_export_nid2str);
1464
1465 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1466 {
1467         struct cfs_hash *nid_hash;
1468         struct obd_export *doomed_exp = NULL;
1469         int exports_evicted = 0;
1470
1471         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1472
1473         spin_lock(&obd->obd_dev_lock);
1474         /* umount has run already, so evict thread should leave
1475          * its task to umount thread now */
1476         if (obd->obd_stopping) {
1477                 spin_unlock(&obd->obd_dev_lock);
1478                 return exports_evicted;
1479         }
1480         nid_hash = obd->obd_nid_hash;
1481         cfs_hash_getref(nid_hash);
1482         spin_unlock(&obd->obd_dev_lock);
1483
1484         do {
1485                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1486                 if (doomed_exp == NULL)
1487                         break;
1488
1489                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1490                          "nid %s found, wanted nid %s, requested nid %s\n",
1491                          obd_export_nid2str(doomed_exp),
1492                          libcfs_nid2str(nid_key), nid);
1493                 LASSERTF(doomed_exp != obd->obd_self_export,
1494                          "self-export is hashed by NID?\n");
1495                 exports_evicted++;
1496                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1497                               "request\n", obd->obd_name,
1498                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1499                               obd_export_nid2str(doomed_exp));
1500                 class_fail_export(doomed_exp);
1501                 class_export_put(doomed_exp);
1502         } while (1);
1503
1504         cfs_hash_putref(nid_hash);
1505
1506         if (!exports_evicted)
1507                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1508                        obd->obd_name, nid);
1509         return exports_evicted;
1510 }
1511 EXPORT_SYMBOL(obd_export_evict_by_nid);
1512
1513 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1514 {
1515         struct cfs_hash *uuid_hash;
1516         struct obd_export *doomed_exp = NULL;
1517         struct obd_uuid doomed_uuid;
1518         int exports_evicted = 0;
1519
1520         spin_lock(&obd->obd_dev_lock);
1521         if (obd->obd_stopping) {
1522                 spin_unlock(&obd->obd_dev_lock);
1523                 return exports_evicted;
1524         }
1525         uuid_hash = obd->obd_uuid_hash;
1526         cfs_hash_getref(uuid_hash);
1527         spin_unlock(&obd->obd_dev_lock);
1528
1529         obd_str2uuid(&doomed_uuid, uuid);
1530         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1531                 CERROR("%s: can't evict myself\n", obd->obd_name);
1532                 cfs_hash_putref(uuid_hash);
1533                 return exports_evicted;
1534         }
1535
1536         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1537
1538         if (doomed_exp == NULL) {
1539                 CERROR("%s: can't disconnect %s: no exports found\n",
1540                        obd->obd_name, uuid);
1541         } else {
1542                 CWARN("%s: evicting %s at adminstrative request\n",
1543                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1544                 class_fail_export(doomed_exp);
1545                 class_export_put(doomed_exp);
1546                 exports_evicted++;
1547         }
1548         cfs_hash_putref(uuid_hash);
1549
1550         return exports_evicted;
1551 }
1552
1553 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1554 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1555 EXPORT_SYMBOL(class_export_dump_hook);
1556 #endif
1557
1558 static void print_export_data(struct obd_export *exp, const char *status,
1559                               int locks, int debug_level)
1560 {
1561         struct ptlrpc_reply_state *rs;
1562         struct ptlrpc_reply_state *first_reply = NULL;
1563         int nreplies = 0;
1564
1565         spin_lock(&exp->exp_lock);
1566         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1567                             rs_exp_list) {
1568                 if (nreplies == 0)
1569                         first_reply = rs;
1570                 nreplies++;
1571         }
1572         spin_unlock(&exp->exp_lock);
1573
1574         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1575                "%p %s %llu stale:%d\n",
1576                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1577                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1578                atomic_read(&exp->exp_rpc_count),
1579                atomic_read(&exp->exp_cb_count),
1580                atomic_read(&exp->exp_locks_count),
1581                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1582                nreplies, first_reply, nreplies > 3 ? "..." : "",
1583                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1584 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1585         if (locks && class_export_dump_hook != NULL)
1586                 class_export_dump_hook(exp);
1587 #endif
1588 }
1589
1590 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1591 {
1592         struct obd_export *exp;
1593
1594         spin_lock(&obd->obd_dev_lock);
1595         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1596                 print_export_data(exp, "ACTIVE", locks, debug_level);
1597         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1598                 print_export_data(exp, "UNLINKED", locks, debug_level);
1599         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1600                 print_export_data(exp, "DELAYED", locks, debug_level);
1601         spin_unlock(&obd->obd_dev_lock);
1602         spin_lock(&obd_zombie_impexp_lock);
1603         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1604                 print_export_data(exp, "ZOMBIE", locks, debug_level);
1605         spin_unlock(&obd_zombie_impexp_lock);
1606 }
1607
1608 void obd_exports_barrier(struct obd_device *obd)
1609 {
1610         int waited = 2;
1611         LASSERT(list_empty(&obd->obd_exports));
1612         spin_lock(&obd->obd_dev_lock);
1613         while (!list_empty(&obd->obd_unlinked_exports)) {
1614                 spin_unlock(&obd->obd_dev_lock);
1615                 set_current_state(TASK_UNINTERRUPTIBLE);
1616                 schedule_timeout(cfs_time_seconds(waited));
1617                 if (waited > 5 && is_power_of_2(waited)) {
1618                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1619                                       "more than %d seconds. "
1620                                       "The obd refcount = %d. Is it stuck?\n",
1621                                       obd->obd_name, waited,
1622                                       atomic_read(&obd->obd_refcount));
1623                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1624                 }
1625                 waited *= 2;
1626                 spin_lock(&obd->obd_dev_lock);
1627         }
1628         spin_unlock(&obd->obd_dev_lock);
1629 }
1630 EXPORT_SYMBOL(obd_exports_barrier);
1631
1632 /* Total amount of zombies to be destroyed */
1633 static int zombies_count = 0;
1634
1635 /**
1636  * kill zombie imports and exports
1637  */
1638 void obd_zombie_impexp_cull(void)
1639 {
1640         struct obd_import *import;
1641         struct obd_export *export;
1642         ENTRY;
1643
1644         do {
1645                 spin_lock(&obd_zombie_impexp_lock);
1646
1647                 import = NULL;
1648                 if (!list_empty(&obd_zombie_imports)) {
1649                         import = list_entry(obd_zombie_imports.next,
1650                                             struct obd_import,
1651                                             imp_zombie_chain);
1652                         list_del_init(&import->imp_zombie_chain);
1653                 }
1654
1655                 export = NULL;
1656                 if (!list_empty(&obd_zombie_exports)) {
1657                         export = list_entry(obd_zombie_exports.next,
1658                                             struct obd_export,
1659                                             exp_obd_chain);
1660                         list_del_init(&export->exp_obd_chain);
1661                 }
1662
1663                 spin_unlock(&obd_zombie_impexp_lock);
1664
1665                 if (import != NULL) {
1666                         class_import_destroy(import);
1667                         spin_lock(&obd_zombie_impexp_lock);
1668                         zombies_count--;
1669                         spin_unlock(&obd_zombie_impexp_lock);
1670                 }
1671
1672                 if (export != NULL) {
1673                         class_export_destroy(export);
1674                         spin_lock(&obd_zombie_impexp_lock);
1675                         zombies_count--;
1676                         spin_unlock(&obd_zombie_impexp_lock);
1677                 }
1678
1679                 cond_resched();
1680         } while (import != NULL || export != NULL);
1681         EXIT;
1682 }
1683
1684 static DECLARE_COMPLETION(obd_zombie_start);
1685 static DECLARE_COMPLETION(obd_zombie_stop);
1686 static unsigned long obd_zombie_flags;
1687 static DECLARE_WAIT_QUEUE_HEAD(obd_zombie_waitq);
1688 static pid_t obd_zombie_pid;
1689
1690 enum {
1691         OBD_ZOMBIE_STOP         = 0x0001,
1692 };
1693
1694 /**
1695  * check for work for kill zombie import/export thread.
1696  */
1697 static int obd_zombie_impexp_check(void *arg)
1698 {
1699         int rc;
1700
1701         spin_lock(&obd_zombie_impexp_lock);
1702         rc = (zombies_count == 0) &&
1703              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1704         spin_unlock(&obd_zombie_impexp_lock);
1705
1706         RETURN(rc);
1707 }
1708
1709 /**
1710  * Add export to the obd_zombe thread and notify it.
1711  */
1712 static void obd_zombie_export_add(struct obd_export *exp) {
1713         atomic_dec(&obd_stale_export_num);
1714         spin_lock(&exp->exp_obd->obd_dev_lock);
1715         LASSERT(!list_empty(&exp->exp_obd_chain));
1716         list_del_init(&exp->exp_obd_chain);
1717         spin_unlock(&exp->exp_obd->obd_dev_lock);
1718         spin_lock(&obd_zombie_impexp_lock);
1719         zombies_count++;
1720         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1721         spin_unlock(&obd_zombie_impexp_lock);
1722
1723         obd_zombie_impexp_notify();
1724 }
1725
1726 /**
1727  * Add import to the obd_zombe thread and notify it.
1728  */
1729 static void obd_zombie_import_add(struct obd_import *imp) {
1730         LASSERT(imp->imp_sec == NULL);
1731         spin_lock(&obd_zombie_impexp_lock);
1732         LASSERT(list_empty(&imp->imp_zombie_chain));
1733         zombies_count++;
1734         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1735         spin_unlock(&obd_zombie_impexp_lock);
1736
1737         obd_zombie_impexp_notify();
1738 }
1739
1740 /**
1741  * notify import/export destroy thread about new zombie.
1742  */
1743 static void obd_zombie_impexp_notify(void)
1744 {
1745         /*
1746          * Make sure obd_zomebie_impexp_thread get this notification.
1747          * It is possible this signal only get by obd_zombie_barrier, and
1748          * barrier gulps this notification and sleeps away and hangs ensues
1749          */
1750         wake_up_all(&obd_zombie_waitq);
1751 }
1752
1753 /**
1754  * check whether obd_zombie is idle
1755  */
1756 static int obd_zombie_is_idle(void)
1757 {
1758         int rc;
1759
1760         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1761         spin_lock(&obd_zombie_impexp_lock);
1762         rc = (zombies_count == 0);
1763         spin_unlock(&obd_zombie_impexp_lock);
1764         return rc;
1765 }
1766
1767 /**
1768  * wait when obd_zombie import/export queues become empty
1769  */
1770 void obd_zombie_barrier(void)
1771 {
1772         struct l_wait_info lwi = { 0 };
1773
1774         if (obd_zombie_pid == current_pid())
1775                 /* don't wait for myself */
1776                 return;
1777         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1778 }
1779 EXPORT_SYMBOL(obd_zombie_barrier);
1780
1781
1782 struct obd_export *obd_stale_export_get(void)
1783 {
1784         struct obd_export *exp = NULL;
1785         ENTRY;
1786
1787         spin_lock(&obd_stale_export_lock);
1788         if (!list_empty(&obd_stale_exports)) {
1789                 exp = list_entry(obd_stale_exports.next,
1790                                  struct obd_export, exp_stale_list);
1791                 list_del_init(&exp->exp_stale_list);
1792         }
1793         spin_unlock(&obd_stale_export_lock);
1794
1795         if (exp) {
1796                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1797                        atomic_read(&obd_stale_export_num));
1798         }
1799         RETURN(exp);
1800 }
1801 EXPORT_SYMBOL(obd_stale_export_get);
1802
1803 void obd_stale_export_put(struct obd_export *exp)
1804 {
1805         ENTRY;
1806
1807         LASSERT(list_empty(&exp->exp_stale_list));
1808         if (exp->exp_lock_hash &&
1809             atomic_read(&exp->exp_lock_hash->hs_count)) {
1810                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1811                        atomic_read(&obd_stale_export_num));
1812
1813                 spin_lock_bh(&exp->exp_bl_list_lock);
1814                 spin_lock(&obd_stale_export_lock);
1815                 /* Add to the tail if there is no blocked locks,
1816                  * to the head otherwise. */
1817                 if (list_empty(&exp->exp_bl_list))
1818                         list_add_tail(&exp->exp_stale_list,
1819                                       &obd_stale_exports);
1820                 else
1821                         list_add(&exp->exp_stale_list,
1822                                  &obd_stale_exports);
1823
1824                 spin_unlock(&obd_stale_export_lock);
1825                 spin_unlock_bh(&exp->exp_bl_list_lock);
1826         } else {
1827                 class_export_put(exp);
1828         }
1829         EXIT;
1830 }
1831 EXPORT_SYMBOL(obd_stale_export_put);
1832
1833 /**
1834  * Adjust the position of the export in the stale list,
1835  * i.e. move to the head of the list if is needed.
1836  **/
1837 void obd_stale_export_adjust(struct obd_export *exp)
1838 {
1839         LASSERT(exp != NULL);
1840         spin_lock_bh(&exp->exp_bl_list_lock);
1841         spin_lock(&obd_stale_export_lock);
1842
1843         if (!list_empty(&exp->exp_stale_list) &&
1844             !list_empty(&exp->exp_bl_list))
1845                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1846
1847         spin_unlock(&obd_stale_export_lock);
1848         spin_unlock_bh(&exp->exp_bl_list_lock);
1849 }
1850 EXPORT_SYMBOL(obd_stale_export_adjust);
1851
1852 /**
1853  * destroy zombie export/import thread.
1854  */
1855 static int obd_zombie_impexp_thread(void *unused)
1856 {
1857         unshare_fs_struct();
1858         complete(&obd_zombie_start);
1859
1860         obd_zombie_pid = current_pid();
1861
1862         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1863                 struct l_wait_info lwi = { 0 };
1864
1865                 l_wait_event(obd_zombie_waitq,
1866                              !obd_zombie_impexp_check(NULL), &lwi);
1867                 obd_zombie_impexp_cull();
1868
1869                 /*
1870                  * Notify obd_zombie_barrier callers that queues
1871                  * may be empty.
1872                  */
1873                 wake_up(&obd_zombie_waitq);
1874         }
1875
1876         complete(&obd_zombie_stop);
1877
1878         RETURN(0);
1879 }
1880
1881
1882 /**
1883  * start destroy zombie import/export thread
1884  */
1885 int obd_zombie_impexp_init(void)
1886 {
1887         struct task_struct *task;
1888
1889         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1890         if (IS_ERR(task))
1891                 RETURN(PTR_ERR(task));
1892
1893         wait_for_completion(&obd_zombie_start);
1894         RETURN(0);
1895 }
1896 /**
1897  * stop destroy zombie import/export thread
1898  */
1899 void obd_zombie_impexp_stop(void)
1900 {
1901         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1902         obd_zombie_impexp_notify();
1903         wait_for_completion(&obd_zombie_stop);
1904         LASSERT(list_empty(&obd_stale_exports));
1905 }
1906
1907 /***** Kernel-userspace comm helpers *******/
1908
1909 /* Get length of entire message, including header */
1910 int kuc_len(int payload_len)
1911 {
1912         return sizeof(struct kuc_hdr) + payload_len;
1913 }
1914 EXPORT_SYMBOL(kuc_len);
1915
1916 /* Get a pointer to kuc header, given a ptr to the payload
1917  * @param p Pointer to payload area
1918  * @returns Pointer to kuc header
1919  */
1920 struct kuc_hdr * kuc_ptr(void *p)
1921 {
1922         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1923         LASSERT(lh->kuc_magic == KUC_MAGIC);
1924         return lh;
1925 }
1926 EXPORT_SYMBOL(kuc_ptr);
1927
1928 /* Alloc space for a message, and fill in header
1929  * @return Pointer to payload area
1930  */
1931 void *kuc_alloc(int payload_len, int transport, int type)
1932 {
1933         struct kuc_hdr *lh;
1934         int len = kuc_len(payload_len);
1935
1936         OBD_ALLOC(lh, len);
1937         if (lh == NULL)
1938                 return ERR_PTR(-ENOMEM);
1939
1940         lh->kuc_magic = KUC_MAGIC;
1941         lh->kuc_transport = transport;
1942         lh->kuc_msgtype = type;
1943         lh->kuc_msglen = len;
1944
1945         return (void *)(lh + 1);
1946 }
1947 EXPORT_SYMBOL(kuc_alloc);
1948
1949 /* Takes pointer to payload area */
1950 void kuc_free(void *p, int payload_len)
1951 {
1952         struct kuc_hdr *lh = kuc_ptr(p);
1953         OBD_FREE(lh, kuc_len(payload_len));
1954 }
1955 EXPORT_SYMBOL(kuc_free);
1956
1957 struct obd_request_slot_waiter {
1958         struct list_head        orsw_entry;
1959         wait_queue_head_t       orsw_waitq;
1960         bool                    orsw_signaled;
1961 };
1962
1963 static bool obd_request_slot_avail(struct client_obd *cli,
1964                                    struct obd_request_slot_waiter *orsw)
1965 {
1966         bool avail;
1967
1968         spin_lock(&cli->cl_loi_list_lock);
1969         avail = !!list_empty(&orsw->orsw_entry);
1970         spin_unlock(&cli->cl_loi_list_lock);
1971
1972         return avail;
1973 };
1974
1975 /*
1976  * For network flow control, the RPC sponsor needs to acquire a credit
1977  * before sending the RPC. The credits count for a connection is defined
1978  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1979  * the subsequent RPC sponsors need to wait until others released their
1980  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1981  */
1982 int obd_get_request_slot(struct client_obd *cli)
1983 {
1984         struct obd_request_slot_waiter   orsw;
1985         struct l_wait_info               lwi;
1986         int                              rc;
1987
1988         spin_lock(&cli->cl_loi_list_lock);
1989         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1990                 cli->cl_r_in_flight++;
1991                 spin_unlock(&cli->cl_loi_list_lock);
1992                 return 0;
1993         }
1994
1995         init_waitqueue_head(&orsw.orsw_waitq);
1996         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1997         orsw.orsw_signaled = false;
1998         spin_unlock(&cli->cl_loi_list_lock);
1999
2000         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2001         rc = l_wait_event(orsw.orsw_waitq,
2002                           obd_request_slot_avail(cli, &orsw) ||
2003                           orsw.orsw_signaled,
2004                           &lwi);
2005
2006         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2007          * freed but other (such as obd_put_request_slot) is using it. */
2008         spin_lock(&cli->cl_loi_list_lock);
2009         if (rc != 0) {
2010                 if (!orsw.orsw_signaled) {
2011                         if (list_empty(&orsw.orsw_entry))
2012                                 cli->cl_r_in_flight--;
2013                         else
2014                                 list_del(&orsw.orsw_entry);
2015                 }
2016         }
2017
2018         if (orsw.orsw_signaled) {
2019                 LASSERT(list_empty(&orsw.orsw_entry));
2020
2021                 rc = -EINTR;
2022         }
2023         spin_unlock(&cli->cl_loi_list_lock);
2024
2025         return rc;
2026 }
2027 EXPORT_SYMBOL(obd_get_request_slot);
2028
2029 void obd_put_request_slot(struct client_obd *cli)
2030 {
2031         struct obd_request_slot_waiter *orsw;
2032
2033         spin_lock(&cli->cl_loi_list_lock);
2034         cli->cl_r_in_flight--;
2035
2036         /* If there is free slot, wakeup the first waiter. */
2037         if (!list_empty(&cli->cl_loi_read_list) &&
2038             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2039                 orsw = list_entry(cli->cl_loi_read_list.next,
2040                                   struct obd_request_slot_waiter, orsw_entry);
2041                 list_del_init(&orsw->orsw_entry);
2042                 cli->cl_r_in_flight++;
2043                 wake_up(&orsw->orsw_waitq);
2044         }
2045         spin_unlock(&cli->cl_loi_list_lock);
2046 }
2047 EXPORT_SYMBOL(obd_put_request_slot);
2048
2049 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2050 {
2051         return cli->cl_max_rpcs_in_flight;
2052 }
2053 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2054
2055 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2056 {
2057         struct obd_request_slot_waiter *orsw;
2058         __u32                           old;
2059         int                             diff;
2060         int                             i;
2061         char                            *typ_name;
2062         int                             rc;
2063
2064         if (max > OBD_MAX_RIF_MAX || max < 1)
2065                 return -ERANGE;
2066
2067         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2068         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2069                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2070                  * strictly lower that max_rpcs_in_flight */
2071                 if (max < 2) {
2072                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2073                                "because it must be higher than "
2074                                "max_mod_rpcs_in_flight value",
2075                                cli->cl_import->imp_obd->obd_name);
2076                         return -ERANGE;
2077                 }
2078                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2079                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2080                         if (rc != 0)
2081                                 return rc;
2082                 }
2083         }
2084
2085         spin_lock(&cli->cl_loi_list_lock);
2086         old = cli->cl_max_rpcs_in_flight;
2087         cli->cl_max_rpcs_in_flight = max;
2088         diff = max - old;
2089
2090         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2091         for (i = 0; i < diff; i++) {
2092                 if (list_empty(&cli->cl_loi_read_list))
2093                         break;
2094
2095                 orsw = list_entry(cli->cl_loi_read_list.next,
2096                                   struct obd_request_slot_waiter, orsw_entry);
2097                 list_del_init(&orsw->orsw_entry);
2098                 cli->cl_r_in_flight++;
2099                 wake_up(&orsw->orsw_waitq);
2100         }
2101         spin_unlock(&cli->cl_loi_list_lock);
2102
2103         return 0;
2104 }
2105 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2106
2107 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2108 {
2109         return cli->cl_max_mod_rpcs_in_flight;
2110 }
2111 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2112
2113 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2114 {
2115         struct obd_connect_data *ocd;
2116         __u16 maxmodrpcs;
2117         __u16 prev;
2118
2119         if (max > OBD_MAX_RIF_MAX || max < 1)
2120                 return -ERANGE;
2121
2122         /* cannot exceed or equal max_rpcs_in_flight */
2123         if (max >= cli->cl_max_rpcs_in_flight) {
2124                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2125                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2126                        cli->cl_import->imp_obd->obd_name,
2127                        max, cli->cl_max_rpcs_in_flight);
2128                 return -ERANGE;
2129         }
2130
2131         /* cannot exceed max modify RPCs in flight supported by the server */
2132         ocd = &cli->cl_import->imp_connect_data;
2133         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2134                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2135         else
2136                 maxmodrpcs = 1;
2137         if (max > maxmodrpcs) {
2138                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2139                        "higher than max_mod_rpcs_per_client value (%hu) "
2140                        "returned by the server at connection\n",
2141                        cli->cl_import->imp_obd->obd_name,
2142                        max, maxmodrpcs);
2143                 return -ERANGE;
2144         }
2145
2146         spin_lock(&cli->cl_mod_rpcs_lock);
2147
2148         prev = cli->cl_max_mod_rpcs_in_flight;
2149         cli->cl_max_mod_rpcs_in_flight = max;
2150
2151         /* wakeup waiters if limit has been increased */
2152         if (cli->cl_max_mod_rpcs_in_flight > prev)
2153                 wake_up(&cli->cl_mod_rpcs_waitq);
2154
2155         spin_unlock(&cli->cl_mod_rpcs_lock);
2156
2157         return 0;
2158 }
2159 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2160
2161
2162 #define pct(a, b) (b ? a * 100 / b : 0)
2163 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2164                                struct seq_file *seq)
2165 {
2166         unsigned long mod_tot = 0, mod_cum;
2167         struct timespec64 now;
2168         int i;
2169
2170         ktime_get_real_ts64(&now);
2171
2172         spin_lock(&cli->cl_mod_rpcs_lock);
2173
2174         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2175                    (s64)now.tv_sec, now.tv_nsec);
2176         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2177                    cli->cl_mod_rpcs_in_flight);
2178
2179         seq_printf(seq, "\n\t\t\tmodify\n");
2180         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2181
2182         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2183
2184         mod_cum = 0;
2185         for (i = 0; i < OBD_HIST_MAX; i++) {
2186                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2187                 mod_cum += mod;
2188                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2189                            i, mod, pct(mod, mod_tot),
2190                            pct(mod_cum, mod_tot));
2191                 if (mod_cum == mod_tot)
2192                         break;
2193         }
2194
2195         spin_unlock(&cli->cl_mod_rpcs_lock);
2196
2197         return 0;
2198 }
2199 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2200 #undef pct
2201
2202
2203 /* The number of modify RPCs sent in parallel is limited
2204  * because the server has a finite number of slots per client to
2205  * store request result and ensure reply reconstruction when needed.
2206  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2207  * that takes into account server limit and cl_max_rpcs_in_flight
2208  * value.
2209  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2210  * one close request is allowed above the maximum.
2211  */
2212 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2213                                                  bool close_req)
2214 {
2215         bool avail;
2216
2217         /* A slot is available if
2218          * - number of modify RPCs in flight is less than the max
2219          * - it's a close RPC and no other close request is in flight
2220          */
2221         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2222                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2223
2224         return avail;
2225 }
2226
2227 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2228                                          bool close_req)
2229 {
2230         bool avail;
2231
2232         spin_lock(&cli->cl_mod_rpcs_lock);
2233         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2234         spin_unlock(&cli->cl_mod_rpcs_lock);
2235         return avail;
2236 }
2237
2238 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2239 {
2240         if (it != NULL &&
2241             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2242              it->it_op == IT_READDIR ||
2243              (it->it_op == IT_LAYOUT && !(it->it_flags & FMODE_WRITE))))
2244                         return true;
2245         return false;
2246 }
2247
2248 /* Get a modify RPC slot from the obd client @cli according
2249  * to the kind of operation @opc that is going to be sent
2250  * and the intent @it of the operation if it applies.
2251  * If the maximum number of modify RPCs in flight is reached
2252  * the thread is put to sleep.
2253  * Returns the tag to be set in the request message. Tag 0
2254  * is reserved for non-modifying requests.
2255  */
2256 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2257                            struct lookup_intent *it)
2258 {
2259         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2260         bool                    close_req = false;
2261         __u16                   i, max;
2262
2263         /* read-only metadata RPCs don't consume a slot on MDT
2264          * for reply reconstruction
2265          */
2266         if (obd_skip_mod_rpc_slot(it))
2267                 return 0;
2268
2269         if (opc == MDS_CLOSE)
2270                 close_req = true;
2271
2272         do {
2273                 spin_lock(&cli->cl_mod_rpcs_lock);
2274                 max = cli->cl_max_mod_rpcs_in_flight;
2275                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2276                         /* there is a slot available */
2277                         cli->cl_mod_rpcs_in_flight++;
2278                         if (close_req)
2279                                 cli->cl_close_rpcs_in_flight++;
2280                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2281                                          cli->cl_mod_rpcs_in_flight);
2282                         /* find a free tag */
2283                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2284                                                 max + 1);
2285                         LASSERT(i < OBD_MAX_RIF_MAX);
2286                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2287                         spin_unlock(&cli->cl_mod_rpcs_lock);
2288                         /* tag 0 is reserved for non-modify RPCs */
2289                         return i + 1;
2290                 }
2291                 spin_unlock(&cli->cl_mod_rpcs_lock);
2292
2293                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2294                        "opc %u, max %hu\n",
2295                        cli->cl_import->imp_obd->obd_name, opc, max);
2296
2297                 l_wait_event(cli->cl_mod_rpcs_waitq,
2298                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2299         } while (true);
2300 }
2301 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2302
2303 /* Put a modify RPC slot from the obd client @cli according
2304  * to the kind of operation @opc that has been sent and the
2305  * intent @it of the operation if it applies.
2306  */
2307 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2308                           struct lookup_intent *it, __u16 tag)
2309 {
2310         bool                    close_req = false;
2311
2312         if (obd_skip_mod_rpc_slot(it))
2313                 return;
2314
2315         if (opc == MDS_CLOSE)
2316                 close_req = true;
2317
2318         spin_lock(&cli->cl_mod_rpcs_lock);
2319         cli->cl_mod_rpcs_in_flight--;
2320         if (close_req)
2321                 cli->cl_close_rpcs_in_flight--;
2322         /* release the tag in the bitmap */
2323         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2324         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2325         spin_unlock(&cli->cl_mod_rpcs_lock);
2326         wake_up(&cli->cl_mod_rpcs_waitq);
2327 }
2328 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2329