Whamcloud - gitweb
LU-6401 uapi: split lustre_disk.h into two headers
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lprocfs_status.h>
44 #include <lustre_disk.h>
45 #include <lustre_kernelcomm.h>
46
47 static DEFINE_SPINLOCK(obd_types_lock);
48 static LIST_HEAD(obd_types);
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 static struct kmem_cache *import_cachep;
56
57 static LIST_HEAD(obd_zombie_imports);
58 static LIST_HEAD(obd_zombie_exports);
59 static DEFINE_SPINLOCK(obd_zombie_impexp_lock);
60
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks, int debug_level);
66
67 static LIST_HEAD(obd_stale_exports);
68 static DEFINE_SPINLOCK(obd_stale_export_lock);
69 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
70
71 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
72 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
73
74 /*
75  * support functions: we could use inter-module communication, but this
76  * is more portable to other OS's
77  */
78 static struct obd_device *obd_device_alloc(void)
79 {
80         struct obd_device *obd;
81
82         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
83         if (obd != NULL) {
84                 obd->obd_magic = OBD_DEVICE_MAGIC;
85         }
86         return obd;
87 }
88
89 static void obd_device_free(struct obd_device *obd)
90 {
91         LASSERT(obd != NULL);
92         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
93                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
94         if (obd->obd_namespace != NULL) {
95                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
96                        obd, obd->obd_namespace, obd->obd_force);
97                 LBUG();
98         }
99         lu_ref_fini(&obd->obd_reference);
100         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
101 }
102
103 struct obd_type *class_search_type(const char *name)
104 {
105         struct list_head *tmp;
106         struct obd_type *type;
107
108         spin_lock(&obd_types_lock);
109         list_for_each(tmp, &obd_types) {
110                 type = list_entry(tmp, struct obd_type, typ_chain);
111                 if (strcmp(type->typ_name, name) == 0) {
112                         spin_unlock(&obd_types_lock);
113                         return type;
114                 }
115         }
116         spin_unlock(&obd_types_lock);
117         return NULL;
118 }
119 EXPORT_SYMBOL(class_search_type);
120
121 struct obd_type *class_get_type(const char *name)
122 {
123         struct obd_type *type = class_search_type(name);
124
125 #ifdef HAVE_MODULE_LOADING_SUPPORT
126         if (!type) {
127                 const char *modname = name;
128
129                 if (strcmp(modname, "obdfilter") == 0)
130                         modname = "ofd";
131
132                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
133                         modname = LUSTRE_OSP_NAME;
134
135                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
136                         modname = LUSTRE_MDT_NAME;
137
138                 if (!request_module("%s", modname)) {
139                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
140                         type = class_search_type(name);
141                 } else {
142                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
143                                            modname);
144                 }
145         }
146 #endif
147         if (type) {
148                 spin_lock(&type->obd_type_lock);
149                 type->typ_refcnt++;
150                 try_module_get(type->typ_dt_ops->o_owner);
151                 spin_unlock(&type->obd_type_lock);
152         }
153         return type;
154 }
155
156 void class_put_type(struct obd_type *type)
157 {
158         LASSERT(type);
159         spin_lock(&type->obd_type_lock);
160         type->typ_refcnt--;
161         module_put(type->typ_dt_ops->o_owner);
162         spin_unlock(&type->obd_type_lock);
163 }
164
165 #define CLASS_MAX_NAME 1024
166
167 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
168                         bool enable_proc, struct lprocfs_vars *vars,
169                         const char *name, struct lu_device_type *ldt)
170 {
171         struct obd_type *type;
172         int rc = 0;
173         ENTRY;
174
175         /* sanity check */
176         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
177
178         if (class_search_type(name)) {
179                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
180                 RETURN(-EEXIST);
181         }
182
183         rc = -ENOMEM;
184         OBD_ALLOC(type, sizeof(*type));
185         if (type == NULL)
186                 RETURN(rc);
187
188         OBD_ALLOC_PTR(type->typ_dt_ops);
189         OBD_ALLOC_PTR(type->typ_md_ops);
190         OBD_ALLOC(type->typ_name, strlen(name) + 1);
191
192         if (type->typ_dt_ops == NULL ||
193             type->typ_md_ops == NULL ||
194             type->typ_name == NULL)
195                 GOTO (failed, rc);
196
197         *(type->typ_dt_ops) = *dt_ops;
198         /* md_ops is optional */
199         if (md_ops)
200                 *(type->typ_md_ops) = *md_ops;
201         strcpy(type->typ_name, name);
202         spin_lock_init(&type->obd_type_lock);
203
204 #ifdef CONFIG_PROC_FS
205         if (enable_proc) {
206                 type->typ_procroot = lprocfs_register(type->typ_name,
207                                                       proc_lustre_root,
208                                                       vars, type);
209                 if (IS_ERR(type->typ_procroot)) {
210                         rc = PTR_ERR(type->typ_procroot);
211                         type->typ_procroot = NULL;
212                         GOTO(failed, rc);
213                 }
214         }
215 #endif
216         if (ldt != NULL) {
217                 type->typ_lu = ldt;
218                 rc = lu_device_type_init(ldt);
219                 if (rc != 0)
220                         GOTO (failed, rc);
221         }
222
223         spin_lock(&obd_types_lock);
224         list_add(&type->typ_chain, &obd_types);
225         spin_unlock(&obd_types_lock);
226
227         RETURN (0);
228
229 failed:
230         if (type->typ_name != NULL) {
231 #ifdef CONFIG_PROC_FS
232                 if (type->typ_procroot != NULL)
233                         remove_proc_subtree(type->typ_name, proc_lustre_root);
234 #endif
235                 OBD_FREE(type->typ_name, strlen(name) + 1);
236         }
237         if (type->typ_md_ops != NULL)
238                 OBD_FREE_PTR(type->typ_md_ops);
239         if (type->typ_dt_ops != NULL)
240                 OBD_FREE_PTR(type->typ_dt_ops);
241         OBD_FREE(type, sizeof(*type));
242         RETURN(rc);
243 }
244 EXPORT_SYMBOL(class_register_type);
245
246 int class_unregister_type(const char *name)
247 {
248         struct obd_type *type = class_search_type(name);
249         ENTRY;
250
251         if (!type) {
252                 CERROR("unknown obd type\n");
253                 RETURN(-EINVAL);
254         }
255
256         if (type->typ_refcnt) {
257                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
258                 /* This is a bad situation, let's make the best of it */
259                 /* Remove ops, but leave the name for debugging */
260                 OBD_FREE_PTR(type->typ_dt_ops);
261                 OBD_FREE_PTR(type->typ_md_ops);
262                 RETURN(-EBUSY);
263         }
264
265         /* we do not use type->typ_procroot as for compatibility purposes
266          * other modules can share names (i.e. lod can use lov entry). so
267          * we can't reference pointer as it can get invalided when another
268          * module removes the entry */
269 #ifdef CONFIG_PROC_FS
270         if (type->typ_procroot != NULL)
271                 remove_proc_subtree(type->typ_name, proc_lustre_root);
272         if (type->typ_procsym != NULL)
273                 lprocfs_remove(&type->typ_procsym);
274 #endif
275         if (type->typ_lu)
276                 lu_device_type_fini(type->typ_lu);
277
278         spin_lock(&obd_types_lock);
279         list_del(&type->typ_chain);
280         spin_unlock(&obd_types_lock);
281         OBD_FREE(type->typ_name, strlen(name) + 1);
282         if (type->typ_dt_ops != NULL)
283                 OBD_FREE_PTR(type->typ_dt_ops);
284         if (type->typ_md_ops != NULL)
285                 OBD_FREE_PTR(type->typ_md_ops);
286         OBD_FREE(type, sizeof(*type));
287         RETURN(0);
288 } /* class_unregister_type */
289 EXPORT_SYMBOL(class_unregister_type);
290
291 /**
292  * Create a new obd device.
293  *
294  * Find an empty slot in ::obd_devs[], create a new obd device in it.
295  *
296  * \param[in] type_name obd device type string.
297  * \param[in] name      obd device name.
298  *
299  * \retval NULL if create fails, otherwise return the obd device
300  *         pointer created.
301  */
302 struct obd_device *class_newdev(const char *type_name, const char *name)
303 {
304         struct obd_device *result = NULL;
305         struct obd_device *newdev;
306         struct obd_type *type = NULL;
307         int i;
308         int new_obd_minor = 0;
309         ENTRY;
310
311         if (strlen(name) >= MAX_OBD_NAME) {
312                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
313                 RETURN(ERR_PTR(-EINVAL));
314         }
315
316         type = class_get_type(type_name);
317         if (type == NULL){
318                 CERROR("OBD: unknown type: %s\n", type_name);
319                 RETURN(ERR_PTR(-ENODEV));
320         }
321
322         newdev = obd_device_alloc();
323         if (newdev == NULL)
324                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
325
326         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
327
328         write_lock(&obd_dev_lock);
329         for (i = 0; i < class_devno_max(); i++) {
330                 struct obd_device *obd = class_num2obd(i);
331
332                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
333                         CERROR("Device %s already exists at %d, won't add\n",
334                                name, i);
335                         if (result) {
336                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
337                                          "%p obd_magic %08x != %08x\n", result,
338                                          result->obd_magic, OBD_DEVICE_MAGIC);
339                                 LASSERTF(result->obd_minor == new_obd_minor,
340                                          "%p obd_minor %d != %d\n", result,
341                                          result->obd_minor, new_obd_minor);
342
343                                 obd_devs[result->obd_minor] = NULL;
344                                 result->obd_name[0]='\0';
345                          }
346                         result = ERR_PTR(-EEXIST);
347                         break;
348                 }
349                 if (!result && !obd) {
350                         result = newdev;
351                         result->obd_minor = i;
352                         new_obd_minor = i;
353                         result->obd_type = type;
354                         strncpy(result->obd_name, name,
355                                 sizeof(result->obd_name) - 1);
356                         obd_devs[i] = result;
357                 }
358         }
359         write_unlock(&obd_dev_lock);
360
361         if (result == NULL && i >= class_devno_max()) {
362                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
363                        class_devno_max());
364                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
365         }
366
367         if (IS_ERR(result))
368                 GOTO(out, result);
369
370         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
371                result->obd_name, result);
372
373         RETURN(result);
374 out:
375         obd_device_free(newdev);
376 out_type:
377         class_put_type(type);
378         return result;
379 }
380
381 void class_release_dev(struct obd_device *obd)
382 {
383         struct obd_type *obd_type = obd->obd_type;
384
385         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
386                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
387         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
388                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
389         LASSERT(obd_type != NULL);
390
391         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
392                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
393
394         write_lock(&obd_dev_lock);
395         obd_devs[obd->obd_minor] = NULL;
396         write_unlock(&obd_dev_lock);
397         obd_device_free(obd);
398
399         class_put_type(obd_type);
400 }
401
402 int class_name2dev(const char *name)
403 {
404         int i;
405
406         if (!name)
407                 return -1;
408
409         read_lock(&obd_dev_lock);
410         for (i = 0; i < class_devno_max(); i++) {
411                 struct obd_device *obd = class_num2obd(i);
412
413                 if (obd && strcmp(name, obd->obd_name) == 0) {
414                         /* Make sure we finished attaching before we give
415                            out any references */
416                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
417                         if (obd->obd_attached) {
418                                 read_unlock(&obd_dev_lock);
419                                 return i;
420                         }
421                         break;
422                 }
423         }
424         read_unlock(&obd_dev_lock);
425
426         return -1;
427 }
428
429 struct obd_device *class_name2obd(const char *name)
430 {
431         int dev = class_name2dev(name);
432
433         if (dev < 0 || dev > class_devno_max())
434                 return NULL;
435         return class_num2obd(dev);
436 }
437 EXPORT_SYMBOL(class_name2obd);
438
439 int class_uuid2dev(struct obd_uuid *uuid)
440 {
441         int i;
442
443         read_lock(&obd_dev_lock);
444         for (i = 0; i < class_devno_max(); i++) {
445                 struct obd_device *obd = class_num2obd(i);
446
447                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
448                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
449                         read_unlock(&obd_dev_lock);
450                         return i;
451                 }
452         }
453         read_unlock(&obd_dev_lock);
454
455         return -1;
456 }
457
458 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
459 {
460         int dev = class_uuid2dev(uuid);
461         if (dev < 0)
462                 return NULL;
463         return class_num2obd(dev);
464 }
465 EXPORT_SYMBOL(class_uuid2obd);
466
467 /**
468  * Get obd device from ::obd_devs[]
469  *
470  * \param num [in] array index
471  *
472  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
473  *         otherwise return the obd device there.
474  */
475 struct obd_device *class_num2obd(int num)
476 {
477         struct obd_device *obd = NULL;
478
479         if (num < class_devno_max()) {
480                 obd = obd_devs[num];
481                 if (obd == NULL)
482                         return NULL;
483
484                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
485                          "%p obd_magic %08x != %08x\n",
486                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
487                 LASSERTF(obd->obd_minor == num,
488                          "%p obd_minor %0d != %0d\n",
489                          obd, obd->obd_minor, num);
490         }
491
492         return obd;
493 }
494
495 /**
496  * Get obd devices count. Device in any
497  *    state are counted
498  * \retval obd device count
499  */
500 int get_devices_count(void)
501 {
502         int index, max_index = class_devno_max(), dev_count = 0;
503
504         read_lock(&obd_dev_lock);
505         for (index = 0; index <= max_index; index++) {
506                 struct obd_device *obd = class_num2obd(index);
507                 if (obd != NULL)
508                         dev_count++;
509         }
510         read_unlock(&obd_dev_lock);
511
512         return dev_count;
513 }
514 EXPORT_SYMBOL(get_devices_count);
515
516 void class_obd_list(void)
517 {
518         char *status;
519         int i;
520
521         read_lock(&obd_dev_lock);
522         for (i = 0; i < class_devno_max(); i++) {
523                 struct obd_device *obd = class_num2obd(i);
524
525                 if (obd == NULL)
526                         continue;
527                 if (obd->obd_stopping)
528                         status = "ST";
529                 else if (obd->obd_set_up)
530                         status = "UP";
531                 else if (obd->obd_attached)
532                         status = "AT";
533                 else
534                         status = "--";
535                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
536                          i, status, obd->obd_type->typ_name,
537                          obd->obd_name, obd->obd_uuid.uuid,
538                          atomic_read(&obd->obd_refcount));
539         }
540         read_unlock(&obd_dev_lock);
541         return;
542 }
543
544 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
545    specified, then only the client with that uuid is returned,
546    otherwise any client connected to the tgt is returned. */
547 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
548                                           const char * typ_name,
549                                           struct obd_uuid *grp_uuid)
550 {
551         int i;
552
553         read_lock(&obd_dev_lock);
554         for (i = 0; i < class_devno_max(); i++) {
555                 struct obd_device *obd = class_num2obd(i);
556
557                 if (obd == NULL)
558                         continue;
559                 if ((strncmp(obd->obd_type->typ_name, typ_name,
560                              strlen(typ_name)) == 0)) {
561                         if (obd_uuid_equals(tgt_uuid,
562                                             &obd->u.cli.cl_target_uuid) &&
563                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
564                                                          &obd->obd_uuid) : 1)) {
565                                 read_unlock(&obd_dev_lock);
566                                 return obd;
567                         }
568                 }
569         }
570         read_unlock(&obd_dev_lock);
571
572         return NULL;
573 }
574 EXPORT_SYMBOL(class_find_client_obd);
575
576 /* Iterate the obd_device list looking devices have grp_uuid. Start
577    searching at *next, and if a device is found, the next index to look
578    at is saved in *next. If next is NULL, then the first matching device
579    will always be returned. */
580 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
581 {
582         int i;
583
584         if (next == NULL)
585                 i = 0;
586         else if (*next >= 0 && *next < class_devno_max())
587                 i = *next;
588         else
589                 return NULL;
590
591         read_lock(&obd_dev_lock);
592         for (; i < class_devno_max(); i++) {
593                 struct obd_device *obd = class_num2obd(i);
594
595                 if (obd == NULL)
596                         continue;
597                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
598                         if (next != NULL)
599                                 *next = i+1;
600                         read_unlock(&obd_dev_lock);
601                         return obd;
602                 }
603         }
604         read_unlock(&obd_dev_lock);
605
606         return NULL;
607 }
608 EXPORT_SYMBOL(class_devices_in_group);
609
610 /**
611  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
612  * adjust sptlrpc settings accordingly.
613  */
614 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
615 {
616         struct obd_device  *obd;
617         const char         *type;
618         int                 i, rc = 0, rc2;
619
620         LASSERT(namelen > 0);
621
622         read_lock(&obd_dev_lock);
623         for (i = 0; i < class_devno_max(); i++) {
624                 obd = class_num2obd(i);
625
626                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
627                         continue;
628
629                 /* only notify mdc, osc, osp, lwp, mdt, ost
630                  * because only these have a -sptlrpc llog */
631                 type = obd->obd_type->typ_name;
632                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
633                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
634                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
635                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
636                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
637                     strcmp(type, LUSTRE_OST_NAME) != 0)
638                         continue;
639
640                 if (strncmp(obd->obd_name, fsname, namelen))
641                         continue;
642
643                 class_incref(obd, __FUNCTION__, obd);
644                 read_unlock(&obd_dev_lock);
645                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
646                                          sizeof(KEY_SPTLRPC_CONF),
647                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
648                 rc = rc ? rc : rc2;
649                 class_decref(obd, __FUNCTION__, obd);
650                 read_lock(&obd_dev_lock);
651         }
652         read_unlock(&obd_dev_lock);
653         return rc;
654 }
655 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
656
657 void obd_cleanup_caches(void)
658 {
659         ENTRY;
660         if (obd_device_cachep) {
661                 kmem_cache_destroy(obd_device_cachep);
662                 obd_device_cachep = NULL;
663         }
664         if (obdo_cachep) {
665                 kmem_cache_destroy(obdo_cachep);
666                 obdo_cachep = NULL;
667         }
668         if (import_cachep) {
669                 kmem_cache_destroy(import_cachep);
670                 import_cachep = NULL;
671         }
672
673         EXIT;
674 }
675
676 int obd_init_caches(void)
677 {
678         int rc;
679         ENTRY;
680
681         LASSERT(obd_device_cachep == NULL);
682         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
683                                               sizeof(struct obd_device),
684                                               0, 0, NULL);
685         if (!obd_device_cachep)
686                 GOTO(out, rc = -ENOMEM);
687
688         LASSERT(obdo_cachep == NULL);
689         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
690                                         0, 0, NULL);
691         if (!obdo_cachep)
692                 GOTO(out, rc = -ENOMEM);
693
694         LASSERT(import_cachep == NULL);
695         import_cachep = kmem_cache_create("ll_import_cache",
696                                           sizeof(struct obd_import),
697                                           0, 0, NULL);
698         if (!import_cachep)
699                 GOTO(out, rc = -ENOMEM);
700
701         RETURN(0);
702 out:
703         obd_cleanup_caches();
704         RETURN(rc);
705 }
706
707 /* map connection to client */
708 struct obd_export *class_conn2export(struct lustre_handle *conn)
709 {
710         struct obd_export *export;
711         ENTRY;
712
713         if (!conn) {
714                 CDEBUG(D_CACHE, "looking for null handle\n");
715                 RETURN(NULL);
716         }
717
718         if (conn->cookie == -1) {  /* this means assign a new connection */
719                 CDEBUG(D_CACHE, "want a new connection\n");
720                 RETURN(NULL);
721         }
722
723         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
724         export = class_handle2object(conn->cookie, NULL);
725         RETURN(export);
726 }
727 EXPORT_SYMBOL(class_conn2export);
728
729 struct obd_device *class_exp2obd(struct obd_export *exp)
730 {
731         if (exp)
732                 return exp->exp_obd;
733         return NULL;
734 }
735 EXPORT_SYMBOL(class_exp2obd);
736
737 struct obd_device *class_conn2obd(struct lustre_handle *conn)
738 {
739         struct obd_export *export;
740         export = class_conn2export(conn);
741         if (export) {
742                 struct obd_device *obd = export->exp_obd;
743                 class_export_put(export);
744                 return obd;
745         }
746         return NULL;
747 }
748
749 struct obd_import *class_exp2cliimp(struct obd_export *exp)
750 {
751         struct obd_device *obd = exp->exp_obd;
752         if (obd == NULL)
753                 return NULL;
754         return obd->u.cli.cl_import;
755 }
756 EXPORT_SYMBOL(class_exp2cliimp);
757
758 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
759 {
760         struct obd_device *obd = class_conn2obd(conn);
761         if (obd == NULL)
762                 return NULL;
763         return obd->u.cli.cl_import;
764 }
765
766 /* Export management functions */
767 static void class_export_destroy(struct obd_export *exp)
768 {
769         struct obd_device *obd = exp->exp_obd;
770         ENTRY;
771
772         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
773         LASSERT(obd != NULL);
774
775         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
776                exp->exp_client_uuid.uuid, obd->obd_name);
777
778         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
779         if (exp->exp_connection)
780                 ptlrpc_put_connection_superhack(exp->exp_connection);
781
782         LASSERT(list_empty(&exp->exp_outstanding_replies));
783         LASSERT(list_empty(&exp->exp_uncommitted_replies));
784         LASSERT(list_empty(&exp->exp_req_replay_queue));
785         LASSERT(list_empty(&exp->exp_hp_rpcs));
786         obd_destroy_export(exp);
787         class_decref(obd, "export", exp);
788
789         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
790         EXIT;
791 }
792
793 static void export_handle_addref(void *export)
794 {
795         class_export_get(export);
796 }
797
798 static struct portals_handle_ops export_handle_ops = {
799         .hop_addref = export_handle_addref,
800         .hop_free   = NULL,
801 };
802
803 struct obd_export *class_export_get(struct obd_export *exp)
804 {
805         atomic_inc(&exp->exp_refcount);
806         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
807                atomic_read(&exp->exp_refcount));
808         return exp;
809 }
810 EXPORT_SYMBOL(class_export_get);
811
812 void class_export_put(struct obd_export *exp)
813 {
814         LASSERT(exp != NULL);
815         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
816         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
817                atomic_read(&exp->exp_refcount) - 1);
818
819         if (atomic_dec_and_test(&exp->exp_refcount)) {
820                 LASSERT(!list_empty(&exp->exp_obd_chain));
821                 LASSERT(list_empty(&exp->exp_stale_list));
822                 CDEBUG(D_IOCTL, "final put %p/%s\n",
823                        exp, exp->exp_client_uuid.uuid);
824
825                 /* release nid stat refererence */
826                 lprocfs_exp_cleanup(exp);
827
828                 obd_zombie_export_add(exp);
829         }
830 }
831 EXPORT_SYMBOL(class_export_put);
832
833 /* Creates a new export, adds it to the hash table, and returns a
834  * pointer to it. The refcount is 2: one for the hash reference, and
835  * one for the pointer returned by this function. */
836 struct obd_export *class_new_export(struct obd_device *obd,
837                                     struct obd_uuid *cluuid)
838 {
839         struct obd_export *export;
840         struct cfs_hash *hash = NULL;
841         int rc = 0;
842         ENTRY;
843
844         OBD_ALLOC_PTR(export);
845         if (!export)
846                 return ERR_PTR(-ENOMEM);
847
848         export->exp_conn_cnt = 0;
849         export->exp_lock_hash = NULL;
850         export->exp_flock_hash = NULL;
851         atomic_set(&export->exp_refcount, 2);
852         atomic_set(&export->exp_rpc_count, 0);
853         atomic_set(&export->exp_cb_count, 0);
854         atomic_set(&export->exp_locks_count, 0);
855 #if LUSTRE_TRACKS_LOCK_EXP_REFS
856         INIT_LIST_HEAD(&export->exp_locks_list);
857         spin_lock_init(&export->exp_locks_list_guard);
858 #endif
859         atomic_set(&export->exp_replay_count, 0);
860         export->exp_obd = obd;
861         INIT_LIST_HEAD(&export->exp_outstanding_replies);
862         spin_lock_init(&export->exp_uncommitted_replies_lock);
863         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
864         INIT_LIST_HEAD(&export->exp_req_replay_queue);
865         INIT_LIST_HEAD(&export->exp_handle.h_link);
866         INIT_LIST_HEAD(&export->exp_hp_rpcs);
867         INIT_LIST_HEAD(&export->exp_reg_rpcs);
868         class_handle_hash(&export->exp_handle, &export_handle_ops);
869         export->exp_last_request_time = cfs_time_current_sec();
870         spin_lock_init(&export->exp_lock);
871         spin_lock_init(&export->exp_rpc_lock);
872         INIT_HLIST_NODE(&export->exp_uuid_hash);
873         INIT_HLIST_NODE(&export->exp_nid_hash);
874         INIT_HLIST_NODE(&export->exp_gen_hash);
875         spin_lock_init(&export->exp_bl_list_lock);
876         INIT_LIST_HEAD(&export->exp_bl_list);
877         INIT_LIST_HEAD(&export->exp_stale_list);
878
879         export->exp_sp_peer = LUSTRE_SP_ANY;
880         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
881         export->exp_client_uuid = *cluuid;
882         obd_init_export(export);
883
884         spin_lock(&obd->obd_dev_lock);
885         /* shouldn't happen, but might race */
886         if (obd->obd_stopping)
887                 GOTO(exit_unlock, rc = -ENODEV);
888
889         hash = cfs_hash_getref(obd->obd_uuid_hash);
890         if (hash == NULL)
891                 GOTO(exit_unlock, rc = -ENODEV);
892         spin_unlock(&obd->obd_dev_lock);
893
894         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
895                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
896                 if (rc != 0) {
897                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
898                                       obd->obd_name, cluuid->uuid, rc);
899                         GOTO(exit_err, rc = -EALREADY);
900                 }
901         }
902
903         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
904         spin_lock(&obd->obd_dev_lock);
905         if (obd->obd_stopping) {
906                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
907                 GOTO(exit_unlock, rc = -ENODEV);
908         }
909
910         class_incref(obd, "export", export);
911         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
912         list_add_tail(&export->exp_obd_chain_timed,
913                       &export->exp_obd->obd_exports_timed);
914         export->exp_obd->obd_num_exports++;
915         spin_unlock(&obd->obd_dev_lock);
916         cfs_hash_putref(hash);
917         RETURN(export);
918
919 exit_unlock:
920         spin_unlock(&obd->obd_dev_lock);
921 exit_err:
922         if (hash)
923                 cfs_hash_putref(hash);
924         class_handle_unhash(&export->exp_handle);
925         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
926         obd_destroy_export(export);
927         OBD_FREE_PTR(export);
928         return ERR_PTR(rc);
929 }
930 EXPORT_SYMBOL(class_new_export);
931
932 void class_unlink_export(struct obd_export *exp)
933 {
934         class_handle_unhash(&exp->exp_handle);
935
936         spin_lock(&exp->exp_obd->obd_dev_lock);
937         /* delete an uuid-export hashitem from hashtables */
938         if (!hlist_unhashed(&exp->exp_uuid_hash))
939                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
940                              &exp->exp_client_uuid,
941                              &exp->exp_uuid_hash);
942
943 #ifdef HAVE_SERVER_SUPPORT
944         if (!hlist_unhashed(&exp->exp_gen_hash)) {
945                 struct tg_export_data   *ted = &exp->exp_target_data;
946                 struct cfs_hash         *hash;
947
948                 /* Because obd_gen_hash will not be released until
949                  * class_cleanup(), so hash should never be NULL here */
950                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
951                 LASSERT(hash != NULL);
952                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
953                              &exp->exp_gen_hash);
954                 cfs_hash_putref(hash);
955         }
956 #endif /* HAVE_SERVER_SUPPORT */
957
958         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
959         list_del_init(&exp->exp_obd_chain_timed);
960         exp->exp_obd->obd_num_exports--;
961         spin_unlock(&exp->exp_obd->obd_dev_lock);
962         atomic_inc(&obd_stale_export_num);
963
964         /* A reference is kept by obd_stale_exports list */
965         obd_stale_export_put(exp);
966 }
967 EXPORT_SYMBOL(class_unlink_export);
968
969 /* Import management functions */
970 static void class_import_destroy(struct obd_import *imp)
971 {
972         ENTRY;
973
974         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
975                 imp->imp_obd->obd_name);
976
977         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
978
979         ptlrpc_put_connection_superhack(imp->imp_connection);
980
981         while (!list_empty(&imp->imp_conn_list)) {
982                 struct obd_import_conn *imp_conn;
983
984                 imp_conn = list_entry(imp->imp_conn_list.next,
985                                       struct obd_import_conn, oic_item);
986                 list_del_init(&imp_conn->oic_item);
987                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
988                 OBD_FREE(imp_conn, sizeof(*imp_conn));
989         }
990
991         LASSERT(imp->imp_sec == NULL);
992         class_decref(imp->imp_obd, "import", imp);
993         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
994         EXIT;
995 }
996
997 static void import_handle_addref(void *import)
998 {
999         class_import_get(import);
1000 }
1001
1002 static struct portals_handle_ops import_handle_ops = {
1003         .hop_addref = import_handle_addref,
1004         .hop_free   = NULL,
1005 };
1006
1007 struct obd_import *class_import_get(struct obd_import *import)
1008 {
1009         atomic_inc(&import->imp_refcount);
1010         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1011                atomic_read(&import->imp_refcount),
1012                import->imp_obd->obd_name);
1013         return import;
1014 }
1015 EXPORT_SYMBOL(class_import_get);
1016
1017 void class_import_put(struct obd_import *imp)
1018 {
1019         ENTRY;
1020
1021         LASSERT(list_empty(&imp->imp_zombie_chain));
1022         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1023
1024         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1025                atomic_read(&imp->imp_refcount) - 1,
1026                imp->imp_obd->obd_name);
1027
1028         if (atomic_dec_and_test(&imp->imp_refcount)) {
1029                 CDEBUG(D_INFO, "final put import %p\n", imp);
1030                 obd_zombie_import_add(imp);
1031         }
1032
1033         /* catch possible import put race */
1034         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1035         EXIT;
1036 }
1037 EXPORT_SYMBOL(class_import_put);
1038
1039 static void init_imp_at(struct imp_at *at) {
1040         int i;
1041         at_init(&at->iat_net_latency, 0, 0);
1042         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1043                 /* max service estimates are tracked on the server side, so
1044                    don't use the AT history here, just use the last reported
1045                    val. (But keep hist for proc histogram, worst_ever) */
1046                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1047                         AT_FLG_NOHIST);
1048         }
1049 }
1050
1051 struct obd_import *class_new_import(struct obd_device *obd)
1052 {
1053         struct obd_import *imp;
1054         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1055
1056         OBD_ALLOC(imp, sizeof(*imp));
1057         if (imp == NULL)
1058                 return NULL;
1059
1060         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1061         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1062         INIT_LIST_HEAD(&imp->imp_replay_list);
1063         INIT_LIST_HEAD(&imp->imp_sending_list);
1064         INIT_LIST_HEAD(&imp->imp_delayed_list);
1065         INIT_LIST_HEAD(&imp->imp_committed_list);
1066         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1067         imp->imp_known_replied_xid = 0;
1068         imp->imp_replay_cursor = &imp->imp_committed_list;
1069         spin_lock_init(&imp->imp_lock);
1070         imp->imp_last_success_conn = 0;
1071         imp->imp_state = LUSTRE_IMP_NEW;
1072         imp->imp_obd = class_incref(obd, "import", imp);
1073         mutex_init(&imp->imp_sec_mutex);
1074         init_waitqueue_head(&imp->imp_recovery_waitq);
1075
1076         if (curr_pid_ns->child_reaper)
1077                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1078         else
1079                 imp->imp_sec_refpid = 1;
1080
1081         atomic_set(&imp->imp_refcount, 2);
1082         atomic_set(&imp->imp_unregistering, 0);
1083         atomic_set(&imp->imp_inflight, 0);
1084         atomic_set(&imp->imp_replay_inflight, 0);
1085         atomic_set(&imp->imp_inval_count, 0);
1086         INIT_LIST_HEAD(&imp->imp_conn_list);
1087         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1088         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1089         init_imp_at(&imp->imp_at);
1090
1091         /* the default magic is V2, will be used in connect RPC, and
1092          * then adjusted according to the flags in request/reply. */
1093         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1094
1095         return imp;
1096 }
1097 EXPORT_SYMBOL(class_new_import);
1098
1099 void class_destroy_import(struct obd_import *import)
1100 {
1101         LASSERT(import != NULL);
1102         LASSERT(import != LP_POISON);
1103
1104         class_handle_unhash(&import->imp_handle);
1105
1106         spin_lock(&import->imp_lock);
1107         import->imp_generation++;
1108         spin_unlock(&import->imp_lock);
1109         class_import_put(import);
1110 }
1111 EXPORT_SYMBOL(class_destroy_import);
1112
1113 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1114
1115 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1116 {
1117         spin_lock(&exp->exp_locks_list_guard);
1118
1119         LASSERT(lock->l_exp_refs_nr >= 0);
1120
1121         if (lock->l_exp_refs_target != NULL &&
1122             lock->l_exp_refs_target != exp) {
1123                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1124                               exp, lock, lock->l_exp_refs_target);
1125         }
1126         if ((lock->l_exp_refs_nr ++) == 0) {
1127                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1128                 lock->l_exp_refs_target = exp;
1129         }
1130         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1131                lock, exp, lock->l_exp_refs_nr);
1132         spin_unlock(&exp->exp_locks_list_guard);
1133 }
1134 EXPORT_SYMBOL(__class_export_add_lock_ref);
1135
1136 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1137 {
1138         spin_lock(&exp->exp_locks_list_guard);
1139         LASSERT(lock->l_exp_refs_nr > 0);
1140         if (lock->l_exp_refs_target != exp) {
1141                 LCONSOLE_WARN("lock %p, "
1142                               "mismatching export pointers: %p, %p\n",
1143                               lock, lock->l_exp_refs_target, exp);
1144         }
1145         if (-- lock->l_exp_refs_nr == 0) {
1146                 list_del_init(&lock->l_exp_refs_link);
1147                 lock->l_exp_refs_target = NULL;
1148         }
1149         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1150                lock, exp, lock->l_exp_refs_nr);
1151         spin_unlock(&exp->exp_locks_list_guard);
1152 }
1153 EXPORT_SYMBOL(__class_export_del_lock_ref);
1154 #endif
1155
1156 /* A connection defines an export context in which preallocation can
1157    be managed. This releases the export pointer reference, and returns
1158    the export handle, so the export refcount is 1 when this function
1159    returns. */
1160 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1161                   struct obd_uuid *cluuid)
1162 {
1163         struct obd_export *export;
1164         LASSERT(conn != NULL);
1165         LASSERT(obd != NULL);
1166         LASSERT(cluuid != NULL);
1167         ENTRY;
1168
1169         export = class_new_export(obd, cluuid);
1170         if (IS_ERR(export))
1171                 RETURN(PTR_ERR(export));
1172
1173         conn->cookie = export->exp_handle.h_cookie;
1174         class_export_put(export);
1175
1176         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1177                cluuid->uuid, conn->cookie);
1178         RETURN(0);
1179 }
1180 EXPORT_SYMBOL(class_connect);
1181
1182 /* if export is involved in recovery then clean up related things */
1183 static void class_export_recovery_cleanup(struct obd_export *exp)
1184 {
1185         struct obd_device *obd = exp->exp_obd;
1186
1187         spin_lock(&obd->obd_recovery_task_lock);
1188         if (obd->obd_recovering) {
1189                 if (exp->exp_in_recovery) {
1190                         spin_lock(&exp->exp_lock);
1191                         exp->exp_in_recovery = 0;
1192                         spin_unlock(&exp->exp_lock);
1193                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1194                         atomic_dec(&obd->obd_connected_clients);
1195                 }
1196
1197                 /* if called during recovery then should update
1198                  * obd_stale_clients counter,
1199                  * lightweight exports are not counted */
1200                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1201                         exp->exp_obd->obd_stale_clients++;
1202         }
1203         spin_unlock(&obd->obd_recovery_task_lock);
1204
1205         spin_lock(&exp->exp_lock);
1206         /** Cleanup req replay fields */
1207         if (exp->exp_req_replay_needed) {
1208                 exp->exp_req_replay_needed = 0;
1209
1210                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1211                 atomic_dec(&obd->obd_req_replay_clients);
1212         }
1213
1214         /** Cleanup lock replay data */
1215         if (exp->exp_lock_replay_needed) {
1216                 exp->exp_lock_replay_needed = 0;
1217
1218                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1219                 atomic_dec(&obd->obd_lock_replay_clients);
1220         }
1221         spin_unlock(&exp->exp_lock);
1222 }
1223
1224 /* This function removes 1-3 references from the export:
1225  * 1 - for export pointer passed
1226  * and if disconnect really need
1227  * 2 - removing from hash
1228  * 3 - in client_unlink_export
1229  * The export pointer passed to this function can destroyed */
1230 int class_disconnect(struct obd_export *export)
1231 {
1232         int already_disconnected;
1233         ENTRY;
1234
1235         if (export == NULL) {
1236                 CWARN("attempting to free NULL export %p\n", export);
1237                 RETURN(-EINVAL);
1238         }
1239
1240         spin_lock(&export->exp_lock);
1241         already_disconnected = export->exp_disconnected;
1242         export->exp_disconnected = 1;
1243         /*  We hold references of export for uuid hash
1244          *  and nid_hash and export link at least. So
1245          *  it is safe to call cfs_hash_del in there.  */
1246         if (!hlist_unhashed(&export->exp_nid_hash))
1247                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1248                              &export->exp_connection->c_peer.nid,
1249                              &export->exp_nid_hash);
1250         spin_unlock(&export->exp_lock);
1251
1252         /* class_cleanup(), abort_recovery(), and class_fail_export()
1253          * all end up in here, and if any of them race we shouldn't
1254          * call extra class_export_puts(). */
1255         if (already_disconnected) {
1256                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1257                 GOTO(no_disconn, already_disconnected);
1258         }
1259
1260         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1261                export->exp_handle.h_cookie);
1262
1263         class_export_recovery_cleanup(export);
1264         class_unlink_export(export);
1265 no_disconn:
1266         class_export_put(export);
1267         RETURN(0);
1268 }
1269 EXPORT_SYMBOL(class_disconnect);
1270
1271 /* Return non-zero for a fully connected export */
1272 int class_connected_export(struct obd_export *exp)
1273 {
1274         int connected = 0;
1275
1276         if (exp) {
1277                 spin_lock(&exp->exp_lock);
1278                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1279                 spin_unlock(&exp->exp_lock);
1280         }
1281         return connected;
1282 }
1283 EXPORT_SYMBOL(class_connected_export);
1284
1285 static void class_disconnect_export_list(struct list_head *list,
1286                                          enum obd_option flags)
1287 {
1288         int rc;
1289         struct obd_export *exp;
1290         ENTRY;
1291
1292         /* It's possible that an export may disconnect itself, but
1293          * nothing else will be added to this list. */
1294         while (!list_empty(list)) {
1295                 exp = list_entry(list->next, struct obd_export,
1296                                  exp_obd_chain);
1297                 /* need for safe call CDEBUG after obd_disconnect */
1298                 class_export_get(exp);
1299
1300                 spin_lock(&exp->exp_lock);
1301                 exp->exp_flags = flags;
1302                 spin_unlock(&exp->exp_lock);
1303
1304                 if (obd_uuid_equals(&exp->exp_client_uuid,
1305                                     &exp->exp_obd->obd_uuid)) {
1306                         CDEBUG(D_HA,
1307                                "exp %p export uuid == obd uuid, don't discon\n",
1308                                exp);
1309                         /* Need to delete this now so we don't end up pointing
1310                          * to work_list later when this export is cleaned up. */
1311                         list_del_init(&exp->exp_obd_chain);
1312                         class_export_put(exp);
1313                         continue;
1314                 }
1315
1316                 class_export_get(exp);
1317                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1318                        "last request at "CFS_TIME_T"\n",
1319                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1320                        exp, exp->exp_last_request_time);
1321                 /* release one export reference anyway */
1322                 rc = obd_disconnect(exp);
1323
1324                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1325                        obd_export_nid2str(exp), exp, rc);
1326                 class_export_put(exp);
1327         }
1328         EXIT;
1329 }
1330
1331 void class_disconnect_exports(struct obd_device *obd)
1332 {
1333         struct list_head work_list;
1334         ENTRY;
1335
1336         /* Move all of the exports from obd_exports to a work list, en masse. */
1337         INIT_LIST_HEAD(&work_list);
1338         spin_lock(&obd->obd_dev_lock);
1339         list_splice_init(&obd->obd_exports, &work_list);
1340         list_splice_init(&obd->obd_delayed_exports, &work_list);
1341         spin_unlock(&obd->obd_dev_lock);
1342
1343         if (!list_empty(&work_list)) {
1344                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1345                        "disconnecting them\n", obd->obd_minor, obd);
1346                 class_disconnect_export_list(&work_list,
1347                                              exp_flags_from_obd(obd));
1348         } else
1349                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1350                        obd->obd_minor, obd);
1351         EXIT;
1352 }
1353 EXPORT_SYMBOL(class_disconnect_exports);
1354
1355 /* Remove exports that have not completed recovery.
1356  */
1357 void class_disconnect_stale_exports(struct obd_device *obd,
1358                                     int (*test_export)(struct obd_export *))
1359 {
1360         struct list_head work_list;
1361         struct obd_export *exp, *n;
1362         int evicted = 0;
1363         ENTRY;
1364
1365         INIT_LIST_HEAD(&work_list);
1366         spin_lock(&obd->obd_dev_lock);
1367         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1368                                  exp_obd_chain) {
1369                 /* don't count self-export as client */
1370                 if (obd_uuid_equals(&exp->exp_client_uuid,
1371                                     &exp->exp_obd->obd_uuid))
1372                         continue;
1373
1374                 /* don't evict clients which have no slot in last_rcvd
1375                  * (e.g. lightweight connection) */
1376                 if (exp->exp_target_data.ted_lr_idx == -1)
1377                         continue;
1378
1379                 spin_lock(&exp->exp_lock);
1380                 if (exp->exp_failed || test_export(exp)) {
1381                         spin_unlock(&exp->exp_lock);
1382                         continue;
1383                 }
1384                 exp->exp_failed = 1;
1385                 spin_unlock(&exp->exp_lock);
1386
1387                 list_move(&exp->exp_obd_chain, &work_list);
1388                 evicted++;
1389                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1390                        obd->obd_name, exp->exp_client_uuid.uuid,
1391                        exp->exp_connection == NULL ? "<unknown>" :
1392                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1393                 print_export_data(exp, "EVICTING", 0, D_HA);
1394         }
1395         spin_unlock(&obd->obd_dev_lock);
1396
1397         if (evicted)
1398                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1399                               obd->obd_name, evicted);
1400
1401         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1402                                                  OBD_OPT_ABORT_RECOV);
1403         EXIT;
1404 }
1405 EXPORT_SYMBOL(class_disconnect_stale_exports);
1406
1407 void class_fail_export(struct obd_export *exp)
1408 {
1409         int rc, already_failed;
1410
1411         spin_lock(&exp->exp_lock);
1412         already_failed = exp->exp_failed;
1413         exp->exp_failed = 1;
1414         spin_unlock(&exp->exp_lock);
1415
1416         if (already_failed) {
1417                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1418                        exp, exp->exp_client_uuid.uuid);
1419                 return;
1420         }
1421
1422         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1423                exp, exp->exp_client_uuid.uuid);
1424
1425         if (obd_dump_on_timeout)
1426                 libcfs_debug_dumplog();
1427
1428         /* need for safe call CDEBUG after obd_disconnect */
1429         class_export_get(exp);
1430
1431         /* Most callers into obd_disconnect are removing their own reference
1432          * (request, for example) in addition to the one from the hash table.
1433          * We don't have such a reference here, so make one. */
1434         class_export_get(exp);
1435         rc = obd_disconnect(exp);
1436         if (rc)
1437                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1438         else
1439                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1440                        exp, exp->exp_client_uuid.uuid);
1441         class_export_put(exp);
1442 }
1443 EXPORT_SYMBOL(class_fail_export);
1444
1445 char *obd_export_nid2str(struct obd_export *exp)
1446 {
1447         if (exp->exp_connection != NULL)
1448                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1449
1450         return "(no nid)";
1451 }
1452 EXPORT_SYMBOL(obd_export_nid2str);
1453
1454 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1455 {
1456         struct cfs_hash *nid_hash;
1457         struct obd_export *doomed_exp = NULL;
1458         int exports_evicted = 0;
1459
1460         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1461
1462         spin_lock(&obd->obd_dev_lock);
1463         /* umount has run already, so evict thread should leave
1464          * its task to umount thread now */
1465         if (obd->obd_stopping) {
1466                 spin_unlock(&obd->obd_dev_lock);
1467                 return exports_evicted;
1468         }
1469         nid_hash = obd->obd_nid_hash;
1470         cfs_hash_getref(nid_hash);
1471         spin_unlock(&obd->obd_dev_lock);
1472
1473         do {
1474                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1475                 if (doomed_exp == NULL)
1476                         break;
1477
1478                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1479                          "nid %s found, wanted nid %s, requested nid %s\n",
1480                          obd_export_nid2str(doomed_exp),
1481                          libcfs_nid2str(nid_key), nid);
1482                 LASSERTF(doomed_exp != obd->obd_self_export,
1483                          "self-export is hashed by NID?\n");
1484                 exports_evicted++;
1485                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1486                               "request\n", obd->obd_name,
1487                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1488                               obd_export_nid2str(doomed_exp));
1489                 class_fail_export(doomed_exp);
1490                 class_export_put(doomed_exp);
1491         } while (1);
1492
1493         cfs_hash_putref(nid_hash);
1494
1495         if (!exports_evicted)
1496                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1497                        obd->obd_name, nid);
1498         return exports_evicted;
1499 }
1500 EXPORT_SYMBOL(obd_export_evict_by_nid);
1501
1502 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1503 {
1504         struct cfs_hash *uuid_hash;
1505         struct obd_export *doomed_exp = NULL;
1506         struct obd_uuid doomed_uuid;
1507         int exports_evicted = 0;
1508
1509         spin_lock(&obd->obd_dev_lock);
1510         if (obd->obd_stopping) {
1511                 spin_unlock(&obd->obd_dev_lock);
1512                 return exports_evicted;
1513         }
1514         uuid_hash = obd->obd_uuid_hash;
1515         cfs_hash_getref(uuid_hash);
1516         spin_unlock(&obd->obd_dev_lock);
1517
1518         obd_str2uuid(&doomed_uuid, uuid);
1519         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1520                 CERROR("%s: can't evict myself\n", obd->obd_name);
1521                 cfs_hash_putref(uuid_hash);
1522                 return exports_evicted;
1523         }
1524
1525         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1526
1527         if (doomed_exp == NULL) {
1528                 CERROR("%s: can't disconnect %s: no exports found\n",
1529                        obd->obd_name, uuid);
1530         } else {
1531                 CWARN("%s: evicting %s at adminstrative request\n",
1532                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1533                 class_fail_export(doomed_exp);
1534                 class_export_put(doomed_exp);
1535                 exports_evicted++;
1536         }
1537         cfs_hash_putref(uuid_hash);
1538
1539         return exports_evicted;
1540 }
1541
1542 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1543 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1544 EXPORT_SYMBOL(class_export_dump_hook);
1545 #endif
1546
1547 static void print_export_data(struct obd_export *exp, const char *status,
1548                               int locks, int debug_level)
1549 {
1550         struct ptlrpc_reply_state *rs;
1551         struct ptlrpc_reply_state *first_reply = NULL;
1552         int nreplies = 0;
1553
1554         spin_lock(&exp->exp_lock);
1555         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1556                             rs_exp_list) {
1557                 if (nreplies == 0)
1558                         first_reply = rs;
1559                 nreplies++;
1560         }
1561         spin_unlock(&exp->exp_lock);
1562
1563         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1564                "%p %s %llu stale:%d\n",
1565                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1566                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1567                atomic_read(&exp->exp_rpc_count),
1568                atomic_read(&exp->exp_cb_count),
1569                atomic_read(&exp->exp_locks_count),
1570                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1571                nreplies, first_reply, nreplies > 3 ? "..." : "",
1572                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1573 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1574         if (locks && class_export_dump_hook != NULL)
1575                 class_export_dump_hook(exp);
1576 #endif
1577 }
1578
1579 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1580 {
1581         struct obd_export *exp;
1582
1583         spin_lock(&obd->obd_dev_lock);
1584         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1585                 print_export_data(exp, "ACTIVE", locks, debug_level);
1586         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1587                 print_export_data(exp, "UNLINKED", locks, debug_level);
1588         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1589                 print_export_data(exp, "DELAYED", locks, debug_level);
1590         spin_unlock(&obd->obd_dev_lock);
1591         spin_lock(&obd_zombie_impexp_lock);
1592         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1593                 print_export_data(exp, "ZOMBIE", locks, debug_level);
1594         spin_unlock(&obd_zombie_impexp_lock);
1595 }
1596
1597 void obd_exports_barrier(struct obd_device *obd)
1598 {
1599         int waited = 2;
1600         LASSERT(list_empty(&obd->obd_exports));
1601         spin_lock(&obd->obd_dev_lock);
1602         while (!list_empty(&obd->obd_unlinked_exports)) {
1603                 spin_unlock(&obd->obd_dev_lock);
1604                 set_current_state(TASK_UNINTERRUPTIBLE);
1605                 schedule_timeout(cfs_time_seconds(waited));
1606                 if (waited > 5 && is_power_of_2(waited)) {
1607                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1608                                       "more than %d seconds. "
1609                                       "The obd refcount = %d. Is it stuck?\n",
1610                                       obd->obd_name, waited,
1611                                       atomic_read(&obd->obd_refcount));
1612                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1613                 }
1614                 waited *= 2;
1615                 spin_lock(&obd->obd_dev_lock);
1616         }
1617         spin_unlock(&obd->obd_dev_lock);
1618 }
1619 EXPORT_SYMBOL(obd_exports_barrier);
1620
1621 /* Total amount of zombies to be destroyed */
1622 static int zombies_count = 0;
1623
1624 /**
1625  * kill zombie imports and exports
1626  */
1627 void obd_zombie_impexp_cull(void)
1628 {
1629         struct obd_import *import;
1630         struct obd_export *export;
1631         ENTRY;
1632
1633         do {
1634                 spin_lock(&obd_zombie_impexp_lock);
1635
1636                 import = NULL;
1637                 if (!list_empty(&obd_zombie_imports)) {
1638                         import = list_entry(obd_zombie_imports.next,
1639                                             struct obd_import,
1640                                             imp_zombie_chain);
1641                         list_del_init(&import->imp_zombie_chain);
1642                 }
1643
1644                 export = NULL;
1645                 if (!list_empty(&obd_zombie_exports)) {
1646                         export = list_entry(obd_zombie_exports.next,
1647                                             struct obd_export,
1648                                             exp_obd_chain);
1649                         list_del_init(&export->exp_obd_chain);
1650                 }
1651
1652                 spin_unlock(&obd_zombie_impexp_lock);
1653
1654                 if (import != NULL) {
1655                         class_import_destroy(import);
1656                         spin_lock(&obd_zombie_impexp_lock);
1657                         zombies_count--;
1658                         spin_unlock(&obd_zombie_impexp_lock);
1659                 }
1660
1661                 if (export != NULL) {
1662                         class_export_destroy(export);
1663                         spin_lock(&obd_zombie_impexp_lock);
1664                         zombies_count--;
1665                         spin_unlock(&obd_zombie_impexp_lock);
1666                 }
1667
1668                 cond_resched();
1669         } while (import != NULL || export != NULL);
1670         EXIT;
1671 }
1672
1673 static DECLARE_COMPLETION(obd_zombie_start);
1674 static DECLARE_COMPLETION(obd_zombie_stop);
1675 static unsigned long obd_zombie_flags;
1676 static DECLARE_WAIT_QUEUE_HEAD(obd_zombie_waitq);
1677 static pid_t obd_zombie_pid;
1678
1679 enum {
1680         OBD_ZOMBIE_STOP         = 0x0001,
1681 };
1682
1683 /**
1684  * check for work for kill zombie import/export thread.
1685  */
1686 static int obd_zombie_impexp_check(void *arg)
1687 {
1688         int rc;
1689
1690         spin_lock(&obd_zombie_impexp_lock);
1691         rc = (zombies_count == 0) &&
1692              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1693         spin_unlock(&obd_zombie_impexp_lock);
1694
1695         RETURN(rc);
1696 }
1697
1698 /**
1699  * Add export to the obd_zombe thread and notify it.
1700  */
1701 static void obd_zombie_export_add(struct obd_export *exp) {
1702         atomic_dec(&obd_stale_export_num);
1703         spin_lock(&exp->exp_obd->obd_dev_lock);
1704         LASSERT(!list_empty(&exp->exp_obd_chain));
1705         list_del_init(&exp->exp_obd_chain);
1706         spin_unlock(&exp->exp_obd->obd_dev_lock);
1707         spin_lock(&obd_zombie_impexp_lock);
1708         zombies_count++;
1709         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1710         spin_unlock(&obd_zombie_impexp_lock);
1711
1712         obd_zombie_impexp_notify();
1713 }
1714
1715 /**
1716  * Add import to the obd_zombe thread and notify it.
1717  */
1718 static void obd_zombie_import_add(struct obd_import *imp) {
1719         LASSERT(imp->imp_sec == NULL);
1720         spin_lock(&obd_zombie_impexp_lock);
1721         LASSERT(list_empty(&imp->imp_zombie_chain));
1722         zombies_count++;
1723         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1724         spin_unlock(&obd_zombie_impexp_lock);
1725
1726         obd_zombie_impexp_notify();
1727 }
1728
1729 /**
1730  * notify import/export destroy thread about new zombie.
1731  */
1732 static void obd_zombie_impexp_notify(void)
1733 {
1734         /*
1735          * Make sure obd_zomebie_impexp_thread get this notification.
1736          * It is possible this signal only get by obd_zombie_barrier, and
1737          * barrier gulps this notification and sleeps away and hangs ensues
1738          */
1739         wake_up_all(&obd_zombie_waitq);
1740 }
1741
1742 /**
1743  * check whether obd_zombie is idle
1744  */
1745 static int obd_zombie_is_idle(void)
1746 {
1747         int rc;
1748
1749         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1750         spin_lock(&obd_zombie_impexp_lock);
1751         rc = (zombies_count == 0);
1752         spin_unlock(&obd_zombie_impexp_lock);
1753         return rc;
1754 }
1755
1756 /**
1757  * wait when obd_zombie import/export queues become empty
1758  */
1759 void obd_zombie_barrier(void)
1760 {
1761         struct l_wait_info lwi = { 0 };
1762
1763         if (obd_zombie_pid == current_pid())
1764                 /* don't wait for myself */
1765                 return;
1766         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1767 }
1768 EXPORT_SYMBOL(obd_zombie_barrier);
1769
1770
1771 struct obd_export *obd_stale_export_get(void)
1772 {
1773         struct obd_export *exp = NULL;
1774         ENTRY;
1775
1776         spin_lock(&obd_stale_export_lock);
1777         if (!list_empty(&obd_stale_exports)) {
1778                 exp = list_entry(obd_stale_exports.next,
1779                                  struct obd_export, exp_stale_list);
1780                 list_del_init(&exp->exp_stale_list);
1781         }
1782         spin_unlock(&obd_stale_export_lock);
1783
1784         if (exp) {
1785                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1786                        atomic_read(&obd_stale_export_num));
1787         }
1788         RETURN(exp);
1789 }
1790 EXPORT_SYMBOL(obd_stale_export_get);
1791
1792 void obd_stale_export_put(struct obd_export *exp)
1793 {
1794         ENTRY;
1795
1796         LASSERT(list_empty(&exp->exp_stale_list));
1797         if (exp->exp_lock_hash &&
1798             atomic_read(&exp->exp_lock_hash->hs_count)) {
1799                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1800                        atomic_read(&obd_stale_export_num));
1801
1802                 spin_lock_bh(&exp->exp_bl_list_lock);
1803                 spin_lock(&obd_stale_export_lock);
1804                 /* Add to the tail if there is no blocked locks,
1805                  * to the head otherwise. */
1806                 if (list_empty(&exp->exp_bl_list))
1807                         list_add_tail(&exp->exp_stale_list,
1808                                       &obd_stale_exports);
1809                 else
1810                         list_add(&exp->exp_stale_list,
1811                                  &obd_stale_exports);
1812
1813                 spin_unlock(&obd_stale_export_lock);
1814                 spin_unlock_bh(&exp->exp_bl_list_lock);
1815         } else {
1816                 class_export_put(exp);
1817         }
1818         EXIT;
1819 }
1820 EXPORT_SYMBOL(obd_stale_export_put);
1821
1822 /**
1823  * Adjust the position of the export in the stale list,
1824  * i.e. move to the head of the list if is needed.
1825  **/
1826 void obd_stale_export_adjust(struct obd_export *exp)
1827 {
1828         LASSERT(exp != NULL);
1829         spin_lock_bh(&exp->exp_bl_list_lock);
1830         spin_lock(&obd_stale_export_lock);
1831
1832         if (!list_empty(&exp->exp_stale_list) &&
1833             !list_empty(&exp->exp_bl_list))
1834                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1835
1836         spin_unlock(&obd_stale_export_lock);
1837         spin_unlock_bh(&exp->exp_bl_list_lock);
1838 }
1839 EXPORT_SYMBOL(obd_stale_export_adjust);
1840
1841 /**
1842  * destroy zombie export/import thread.
1843  */
1844 static int obd_zombie_impexp_thread(void *unused)
1845 {
1846         unshare_fs_struct();
1847         complete(&obd_zombie_start);
1848
1849         obd_zombie_pid = current_pid();
1850
1851         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1852                 struct l_wait_info lwi = { 0 };
1853
1854                 l_wait_event(obd_zombie_waitq,
1855                              !obd_zombie_impexp_check(NULL), &lwi);
1856                 obd_zombie_impexp_cull();
1857
1858                 /*
1859                  * Notify obd_zombie_barrier callers that queues
1860                  * may be empty.
1861                  */
1862                 wake_up(&obd_zombie_waitq);
1863         }
1864
1865         complete(&obd_zombie_stop);
1866
1867         RETURN(0);
1868 }
1869
1870
1871 /**
1872  * start destroy zombie import/export thread
1873  */
1874 int obd_zombie_impexp_init(void)
1875 {
1876         struct task_struct *task;
1877
1878         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1879         if (IS_ERR(task))
1880                 RETURN(PTR_ERR(task));
1881
1882         wait_for_completion(&obd_zombie_start);
1883         RETURN(0);
1884 }
1885 /**
1886  * stop destroy zombie import/export thread
1887  */
1888 void obd_zombie_impexp_stop(void)
1889 {
1890         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1891         obd_zombie_impexp_notify();
1892         wait_for_completion(&obd_zombie_stop);
1893         LASSERT(list_empty(&obd_stale_exports));
1894 }
1895
1896 /***** Kernel-userspace comm helpers *******/
1897
1898 /* Get length of entire message, including header */
1899 int kuc_len(int payload_len)
1900 {
1901         return sizeof(struct kuc_hdr) + payload_len;
1902 }
1903 EXPORT_SYMBOL(kuc_len);
1904
1905 /* Get a pointer to kuc header, given a ptr to the payload
1906  * @param p Pointer to payload area
1907  * @returns Pointer to kuc header
1908  */
1909 struct kuc_hdr * kuc_ptr(void *p)
1910 {
1911         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1912         LASSERT(lh->kuc_magic == KUC_MAGIC);
1913         return lh;
1914 }
1915 EXPORT_SYMBOL(kuc_ptr);
1916
1917 /* Alloc space for a message, and fill in header
1918  * @return Pointer to payload area
1919  */
1920 void *kuc_alloc(int payload_len, int transport, int type)
1921 {
1922         struct kuc_hdr *lh;
1923         int len = kuc_len(payload_len);
1924
1925         OBD_ALLOC(lh, len);
1926         if (lh == NULL)
1927                 return ERR_PTR(-ENOMEM);
1928
1929         lh->kuc_magic = KUC_MAGIC;
1930         lh->kuc_transport = transport;
1931         lh->kuc_msgtype = type;
1932         lh->kuc_msglen = len;
1933
1934         return (void *)(lh + 1);
1935 }
1936 EXPORT_SYMBOL(kuc_alloc);
1937
1938 /* Takes pointer to payload area */
1939 void kuc_free(void *p, int payload_len)
1940 {
1941         struct kuc_hdr *lh = kuc_ptr(p);
1942         OBD_FREE(lh, kuc_len(payload_len));
1943 }
1944 EXPORT_SYMBOL(kuc_free);
1945
1946 struct obd_request_slot_waiter {
1947         struct list_head        orsw_entry;
1948         wait_queue_head_t       orsw_waitq;
1949         bool                    orsw_signaled;
1950 };
1951
1952 static bool obd_request_slot_avail(struct client_obd *cli,
1953                                    struct obd_request_slot_waiter *orsw)
1954 {
1955         bool avail;
1956
1957         spin_lock(&cli->cl_loi_list_lock);
1958         avail = !!list_empty(&orsw->orsw_entry);
1959         spin_unlock(&cli->cl_loi_list_lock);
1960
1961         return avail;
1962 };
1963
1964 /*
1965  * For network flow control, the RPC sponsor needs to acquire a credit
1966  * before sending the RPC. The credits count for a connection is defined
1967  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1968  * the subsequent RPC sponsors need to wait until others released their
1969  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1970  */
1971 int obd_get_request_slot(struct client_obd *cli)
1972 {
1973         struct obd_request_slot_waiter   orsw;
1974         struct l_wait_info               lwi;
1975         int                              rc;
1976
1977         spin_lock(&cli->cl_loi_list_lock);
1978         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1979                 cli->cl_r_in_flight++;
1980                 spin_unlock(&cli->cl_loi_list_lock);
1981                 return 0;
1982         }
1983
1984         init_waitqueue_head(&orsw.orsw_waitq);
1985         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1986         orsw.orsw_signaled = false;
1987         spin_unlock(&cli->cl_loi_list_lock);
1988
1989         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1990         rc = l_wait_event(orsw.orsw_waitq,
1991                           obd_request_slot_avail(cli, &orsw) ||
1992                           orsw.orsw_signaled,
1993                           &lwi);
1994
1995         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1996          * freed but other (such as obd_put_request_slot) is using it. */
1997         spin_lock(&cli->cl_loi_list_lock);
1998         if (rc != 0) {
1999                 if (!orsw.orsw_signaled) {
2000                         if (list_empty(&orsw.orsw_entry))
2001                                 cli->cl_r_in_flight--;
2002                         else
2003                                 list_del(&orsw.orsw_entry);
2004                 }
2005         }
2006
2007         if (orsw.orsw_signaled) {
2008                 LASSERT(list_empty(&orsw.orsw_entry));
2009
2010                 rc = -EINTR;
2011         }
2012         spin_unlock(&cli->cl_loi_list_lock);
2013
2014         return rc;
2015 }
2016 EXPORT_SYMBOL(obd_get_request_slot);
2017
2018 void obd_put_request_slot(struct client_obd *cli)
2019 {
2020         struct obd_request_slot_waiter *orsw;
2021
2022         spin_lock(&cli->cl_loi_list_lock);
2023         cli->cl_r_in_flight--;
2024
2025         /* If there is free slot, wakeup the first waiter. */
2026         if (!list_empty(&cli->cl_loi_read_list) &&
2027             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2028                 orsw = list_entry(cli->cl_loi_read_list.next,
2029                                   struct obd_request_slot_waiter, orsw_entry);
2030                 list_del_init(&orsw->orsw_entry);
2031                 cli->cl_r_in_flight++;
2032                 wake_up(&orsw->orsw_waitq);
2033         }
2034         spin_unlock(&cli->cl_loi_list_lock);
2035 }
2036 EXPORT_SYMBOL(obd_put_request_slot);
2037
2038 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2039 {
2040         return cli->cl_max_rpcs_in_flight;
2041 }
2042 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2043
2044 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2045 {
2046         struct obd_request_slot_waiter *orsw;
2047         __u32                           old;
2048         int                             diff;
2049         int                             i;
2050         char                            *typ_name;
2051         int                             rc;
2052
2053         if (max > OBD_MAX_RIF_MAX || max < 1)
2054                 return -ERANGE;
2055
2056         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2057         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2058                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2059                  * strictly lower that max_rpcs_in_flight */
2060                 if (max < 2) {
2061                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2062                                "because it must be higher than "
2063                                "max_mod_rpcs_in_flight value",
2064                                cli->cl_import->imp_obd->obd_name);
2065                         return -ERANGE;
2066                 }
2067                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2068                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2069                         if (rc != 0)
2070                                 return rc;
2071                 }
2072         }
2073
2074         spin_lock(&cli->cl_loi_list_lock);
2075         old = cli->cl_max_rpcs_in_flight;
2076         cli->cl_max_rpcs_in_flight = max;
2077         diff = max - old;
2078
2079         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2080         for (i = 0; i < diff; i++) {
2081                 if (list_empty(&cli->cl_loi_read_list))
2082                         break;
2083
2084                 orsw = list_entry(cli->cl_loi_read_list.next,
2085                                   struct obd_request_slot_waiter, orsw_entry);
2086                 list_del_init(&orsw->orsw_entry);
2087                 cli->cl_r_in_flight++;
2088                 wake_up(&orsw->orsw_waitq);
2089         }
2090         spin_unlock(&cli->cl_loi_list_lock);
2091
2092         return 0;
2093 }
2094 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2095
2096 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2097 {
2098         return cli->cl_max_mod_rpcs_in_flight;
2099 }
2100 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2101
2102 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2103 {
2104         struct obd_connect_data *ocd;
2105         __u16 maxmodrpcs;
2106         __u16 prev;
2107
2108         if (max > OBD_MAX_RIF_MAX || max < 1)
2109                 return -ERANGE;
2110
2111         /* cannot exceed or equal max_rpcs_in_flight */
2112         if (max >= cli->cl_max_rpcs_in_flight) {
2113                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2114                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2115                        cli->cl_import->imp_obd->obd_name,
2116                        max, cli->cl_max_rpcs_in_flight);
2117                 return -ERANGE;
2118         }
2119
2120         /* cannot exceed max modify RPCs in flight supported by the server */
2121         ocd = &cli->cl_import->imp_connect_data;
2122         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2123                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2124         else
2125                 maxmodrpcs = 1;
2126         if (max > maxmodrpcs) {
2127                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2128                        "higher than max_mod_rpcs_per_client value (%hu) "
2129                        "returned by the server at connection\n",
2130                        cli->cl_import->imp_obd->obd_name,
2131                        max, maxmodrpcs);
2132                 return -ERANGE;
2133         }
2134
2135         spin_lock(&cli->cl_mod_rpcs_lock);
2136
2137         prev = cli->cl_max_mod_rpcs_in_flight;
2138         cli->cl_max_mod_rpcs_in_flight = max;
2139
2140         /* wakeup waiters if limit has been increased */
2141         if (cli->cl_max_mod_rpcs_in_flight > prev)
2142                 wake_up(&cli->cl_mod_rpcs_waitq);
2143
2144         spin_unlock(&cli->cl_mod_rpcs_lock);
2145
2146         return 0;
2147 }
2148 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2149
2150
2151 #define pct(a, b) (b ? a * 100 / b : 0)
2152 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2153                                struct seq_file *seq)
2154 {
2155         unsigned long mod_tot = 0, mod_cum;
2156         struct timespec64 now;
2157         int i;
2158
2159         ktime_get_real_ts64(&now);
2160
2161         spin_lock(&cli->cl_mod_rpcs_lock);
2162
2163         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2164                    (s64)now.tv_sec, now.tv_nsec);
2165         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2166                    cli->cl_mod_rpcs_in_flight);
2167
2168         seq_printf(seq, "\n\t\t\tmodify\n");
2169         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2170
2171         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2172
2173         mod_cum = 0;
2174         for (i = 0; i < OBD_HIST_MAX; i++) {
2175                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2176                 mod_cum += mod;
2177                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2178                            i, mod, pct(mod, mod_tot),
2179                            pct(mod_cum, mod_tot));
2180                 if (mod_cum == mod_tot)
2181                         break;
2182         }
2183
2184         spin_unlock(&cli->cl_mod_rpcs_lock);
2185
2186         return 0;
2187 }
2188 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2189 #undef pct
2190
2191
2192 /* The number of modify RPCs sent in parallel is limited
2193  * because the server has a finite number of slots per client to
2194  * store request result and ensure reply reconstruction when needed.
2195  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2196  * that takes into account server limit and cl_max_rpcs_in_flight
2197  * value.
2198  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2199  * one close request is allowed above the maximum.
2200  */
2201 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2202                                                  bool close_req)
2203 {
2204         bool avail;
2205
2206         /* A slot is available if
2207          * - number of modify RPCs in flight is less than the max
2208          * - it's a close RPC and no other close request is in flight
2209          */
2210         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2211                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2212
2213         return avail;
2214 }
2215
2216 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2217                                          bool close_req)
2218 {
2219         bool avail;
2220
2221         spin_lock(&cli->cl_mod_rpcs_lock);
2222         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2223         spin_unlock(&cli->cl_mod_rpcs_lock);
2224         return avail;
2225 }
2226
2227 /* Get a modify RPC slot from the obd client @cli according
2228  * to the kind of operation @opc that is going to be sent
2229  * and the intent @it of the operation if it applies.
2230  * If the maximum number of modify RPCs in flight is reached
2231  * the thread is put to sleep.
2232  * Returns the tag to be set in the request message. Tag 0
2233  * is reserved for non-modifying requests.
2234  */
2235 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2236                            struct lookup_intent *it)
2237 {
2238         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2239         bool                    close_req = false;
2240         __u16                   i, max;
2241
2242         /* read-only metadata RPCs don't consume a slot on MDT
2243          * for reply reconstruction
2244          */
2245         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2246                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2247                 return 0;
2248
2249         if (opc == MDS_CLOSE)
2250                 close_req = true;
2251
2252         do {
2253                 spin_lock(&cli->cl_mod_rpcs_lock);
2254                 max = cli->cl_max_mod_rpcs_in_flight;
2255                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2256                         /* there is a slot available */
2257                         cli->cl_mod_rpcs_in_flight++;
2258                         if (close_req)
2259                                 cli->cl_close_rpcs_in_flight++;
2260                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2261                                          cli->cl_mod_rpcs_in_flight);
2262                         /* find a free tag */
2263                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2264                                                 max + 1);
2265                         LASSERT(i < OBD_MAX_RIF_MAX);
2266                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2267                         spin_unlock(&cli->cl_mod_rpcs_lock);
2268                         /* tag 0 is reserved for non-modify RPCs */
2269                         return i + 1;
2270                 }
2271                 spin_unlock(&cli->cl_mod_rpcs_lock);
2272
2273                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2274                        "opc %u, max %hu\n",
2275                        cli->cl_import->imp_obd->obd_name, opc, max);
2276
2277                 l_wait_event(cli->cl_mod_rpcs_waitq,
2278                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2279         } while (true);
2280 }
2281 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2282
2283 /* Put a modify RPC slot from the obd client @cli according
2284  * to the kind of operation @opc that has been sent and the
2285  * intent @it of the operation if it applies.
2286  */
2287 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2288                           struct lookup_intent *it, __u16 tag)
2289 {
2290         bool                    close_req = false;
2291
2292         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2293                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2294                 return;
2295
2296         if (opc == MDS_CLOSE)
2297                 close_req = true;
2298
2299         spin_lock(&cli->cl_mod_rpcs_lock);
2300         cli->cl_mod_rpcs_in_flight--;
2301         if (close_req)
2302                 cli->cl_close_rpcs_in_flight--;
2303         /* release the tag in the bitmap */
2304         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2305         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2306         spin_unlock(&cli->cl_mod_rpcs_lock);
2307         wake_up(&cli->cl_mod_rpcs_waitq);
2308 }
2309 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2310