Whamcloud - gitweb
d5a880c137184a2a6999f993e1a52fca5376611d
[fs/lustre-release.git] / lustre / ofd / ofd_dev.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ofd/ofd_dev.c
33  *
34  * This file contains OSD API methods for OBD Filter Device (OFD),
35  * request handlers and supplemental functions to set OFD up and clean it up.
36  *
37  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
38  * Author: Mike Pershin <mike.pershin@intel.com>
39  * Author: Johann Lombardi <johann.lombardi@intel.com>
40  */
41 /*
42  * The OBD Filter Device (OFD) module belongs to the Object Storage
43  * Server stack and connects the RPC oriented Unified Target (TGT)
44  * layer (see lustre/include/lu_target.h) to the storage oriented OSD
45  * layer (see Documentation/osd-api.txt).
46  *
47  *     TGT
48  *      |      DT and OBD APIs
49  *     OFD
50  *      |      DT API
51  *     OSD
52  *
53  * OFD implements the LU and OBD device APIs and is responsible for:
54  *
55  * - Handling client requests (create, destroy, bulk IO, setattr,
56  *   get_info, set_info, statfs) for the objects belonging to the OST
57  *   (together with TGT).
58  *
59  * - Providing grant space management which allows clients to reserve
60  *   disk space for data writeback. OFD tracks grants on global and
61  *   per client levels.
62  *
63  * - Handling object precreation requests from MDTs.
64  *
65  * - Operating the LDLM service that allows clients to maintain object
66  *   data cache coherence.
67  */
68
69 #define DEBUG_SUBSYSTEM S_FILTER
70
71 #include <obd_class.h>
72 #include <lustre_param.h>
73 #include <lustre_fid.h>
74 #include <lustre_lfsck.h>
75 #include <lustre/lustre_idl.h>
76 #include <lustre_dlm.h>
77 #include <lustre_quota.h>
78 #include <lustre_nodemap.h>
79
80 #include "ofd_internal.h"
81
82 /* Slab for OFD object allocation */
83 static struct kmem_cache *ofd_object_kmem;
84
85 static struct lu_kmem_descr ofd_caches[] = {
86         {
87                 .ckd_cache = &ofd_object_kmem,
88                 .ckd_name  = "ofd_obj",
89                 .ckd_size  = sizeof(struct ofd_object)
90         },
91         {
92                 .ckd_cache = NULL
93         }
94 };
95
96 /**
97  * Connect OFD to the next device in the stack.
98  *
99  * This function is used for device stack configuration and links OFD
100  * device with bottom OSD device.
101  *
102  * \param[in]  env      execution environment
103  * \param[in]  m        OFD device
104  * \param[in]  next     name of next device in the stack
105  * \param[out] exp      export to return
106  *
107  * \retval              0 and export in \a exp if successful
108  * \retval              negative value on error
109  */
110 static int ofd_connect_to_next(const struct lu_env *env, struct ofd_device *m,
111                                const char *next, struct obd_export **exp)
112 {
113         struct obd_connect_data *data = NULL;
114         struct obd_device       *obd;
115         int                      rc;
116         ENTRY;
117
118         OBD_ALLOC_PTR(data);
119         if (data == NULL)
120                 GOTO(out, rc = -ENOMEM);
121
122         obd = class_name2obd(next);
123         if (obd == NULL) {
124                 CERROR("%s: can't locate next device: %s\n",
125                        ofd_name(m), next);
126                 GOTO(out, rc = -ENOTCONN);
127         }
128
129         data->ocd_connect_flags = OBD_CONNECT_VERSION;
130         data->ocd_version = LUSTRE_VERSION_CODE;
131
132         rc = obd_connect(NULL, exp, obd, &obd->obd_uuid, data, NULL);
133         if (rc) {
134                 CERROR("%s: cannot connect to next dev %s: rc = %d\n",
135                        ofd_name(m), next, rc);
136                 GOTO(out, rc);
137         }
138
139         m->ofd_dt_dev.dd_lu_dev.ld_site =
140                 m->ofd_osd_exp->exp_obd->obd_lu_dev->ld_site;
141         LASSERT(m->ofd_dt_dev.dd_lu_dev.ld_site);
142         m->ofd_osd = lu2dt_dev(m->ofd_osd_exp->exp_obd->obd_lu_dev);
143         m->ofd_dt_dev.dd_lu_dev.ld_site->ls_top_dev = &m->ofd_dt_dev.dd_lu_dev;
144
145 out:
146         if (data)
147                 OBD_FREE_PTR(data);
148         RETURN(rc);
149 }
150
151 /**
152  * Initialize stack of devices.
153  *
154  * This function initializes OFD-OSD device stack to serve OST requests
155  *
156  * \param[in] env       execution environment
157  * \param[in] m         OFD device
158  * \param[in] cfg       Lustre config for this server
159  *
160  * \retval              0 if successful
161  * \retval              negative value on error
162  */
163 static int ofd_stack_init(const struct lu_env *env,
164                           struct ofd_device *m, struct lustre_cfg *cfg)
165 {
166         const char              *dev = lustre_cfg_string(cfg, 0);
167         struct lu_device        *d;
168         struct ofd_thread_info  *info = ofd_info(env);
169         struct lustre_mount_info *lmi;
170         struct lustre_mount_data *lmd;
171         int                      rc;
172         char                    *osdname;
173
174         ENTRY;
175
176         lmi = server_get_mount(dev);
177         if (lmi == NULL) {
178                 CERROR("Cannot get mount info for %s!\n", dev);
179                 RETURN(-ENODEV);
180         }
181
182         lmd = s2lsi(lmi->lmi_sb)->lsi_lmd;
183         if (lmd != NULL && lmd->lmd_flags & LMD_FLG_SKIP_LFSCK)
184                 m->ofd_skip_lfsck = 1;
185
186         /* find bottom osd */
187         OBD_ALLOC(osdname, MTI_NAME_MAXLEN);
188         if (osdname == NULL)
189                 RETURN(-ENOMEM);
190
191         snprintf(osdname, MTI_NAME_MAXLEN, "%s-osd", dev);
192         rc = ofd_connect_to_next(env, m, osdname, &m->ofd_osd_exp);
193         OBD_FREE(osdname, MTI_NAME_MAXLEN);
194         if (rc)
195                 RETURN(rc);
196
197         d = m->ofd_osd_exp->exp_obd->obd_lu_dev;
198         LASSERT(d);
199         m->ofd_osd = lu2dt_dev(d);
200
201         snprintf(info->fti_u.name, sizeof(info->fti_u.name),
202                  "%s-osd", lustre_cfg_string(cfg, 0));
203
204         RETURN(rc);
205 }
206
207 /**
208  * Finalize the device stack OFD-OSD.
209  *
210  * This function cleans OFD-OSD device stack and
211  * disconnects OFD from the OSD.
212  *
213  * \param[in] env       execution environment
214  * \param[in] m         OFD device
215  * \param[in] top       top device of stack
216  *
217  * \retval              0 if successful
218  * \retval              negative value on error
219  */
220 static void ofd_stack_fini(const struct lu_env *env, struct ofd_device *m,
221                            struct lu_device *top)
222 {
223         struct obd_device       *obd = ofd_obd(m);
224         struct lustre_cfg_bufs   bufs;
225         struct lustre_cfg       *lcfg;
226         char                     flags[3] = "";
227
228         ENTRY;
229
230         lu_site_purge(env, top->ld_site, ~0);
231         /* process cleanup, pass mdt obd name to get obd umount flags */
232         lustre_cfg_bufs_reset(&bufs, obd->obd_name);
233         if (obd->obd_force)
234                 strcat(flags, "F");
235         if (obd->obd_fail)
236                 strcat(flags, "A");
237         lustre_cfg_bufs_set_string(&bufs, 1, flags);
238         lcfg = lustre_cfg_new(LCFG_CLEANUP, &bufs);
239         if (lcfg == NULL)
240                 RETURN_EXIT;
241
242         LASSERT(top);
243         top->ld_ops->ldo_process_config(env, top, lcfg);
244         lustre_cfg_free(lcfg);
245
246         lu_site_purge(env, top->ld_site, ~0);
247         if (!cfs_hash_is_empty(top->ld_site->ls_obj_hash)) {
248                 LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
249                 lu_site_print(env, top->ld_site, &msgdata, lu_cdebug_printer);
250         }
251
252         LASSERT(m->ofd_osd_exp);
253         obd_disconnect(m->ofd_osd_exp);
254
255         EXIT;
256 }
257
258 /* For interoperability, see mdt_interop_param[]. */
259 static struct cfg_interop_param ofd_interop_param[] = {
260         { "ost.quota_type",     NULL },
261         { NULL }
262 };
263
264 /**
265  * Check if parameters are symlinks to the OSD.
266  *
267  * Some parameters were moved from ofd to osd and only their
268  * symlinks were kept in ofd by LU-3106. They are:
269  * -writehthrough_cache_enable
270  * -readcache_max_filesize
271  * -read_cache_enable
272  * -brw_stats
273  *
274  * Since they are not included by the static lprocfs var list, a pre-check
275  * is added for them to avoid "unknown param" errors. If they are matched
276  * in this check, they will be passed to the OSD directly.
277  *
278  * \param[in] param     parameters to check
279  *
280  * \retval              true if param is symlink to OSD param
281  *                      false otherwise
282  */
283 static bool match_symlink_param(char *param)
284 {
285         char *sval;
286         int paramlen;
287
288         if (class_match_param(param, PARAM_OST, &param) == 0) {
289                 sval = strchr(param, '=');
290                 if (sval != NULL) {
291                         paramlen = sval - param;
292                         if (strncmp(param, "writethrough_cache_enable",
293                                     paramlen) == 0 ||
294                             strncmp(param, "readcache_max_filesize",
295                                     paramlen) == 0 ||
296                             strncmp(param, "read_cache_enable",
297                                     paramlen) == 0 ||
298                             strncmp(param, "brw_stats", paramlen) == 0)
299                                 return true;
300                 }
301         }
302
303         return false;
304 }
305
306 /**
307  * Process various configuration parameters.
308  *
309  * This function is used by MGS to process specific configurations and
310  * pass them through to the next device in server stack, i.e. the OSD.
311  *
312  * \param[in] env       execution environment
313  * \param[in] d         LU device of OFD
314  * \param[in] cfg       parameters to process
315  *
316  * \retval              0 if successful
317  * \retval              negative value on error
318  */
319 static int ofd_process_config(const struct lu_env *env, struct lu_device *d,
320                               struct lustre_cfg *cfg)
321 {
322         struct ofd_device       *m = ofd_dev(d);
323         struct dt_device        *dt_next = m->ofd_osd;
324         struct lu_device        *next = &dt_next->dd_lu_dev;
325         int                      rc;
326
327         ENTRY;
328
329         switch (cfg->lcfg_command) {
330         case LCFG_PARAM: {
331                 struct obd_device       *obd = ofd_obd(m);
332                 /* For interoperability */
333                 struct cfg_interop_param   *ptr = NULL;
334                 struct lustre_cfg          *old_cfg = NULL;
335                 char                       *param = NULL;
336
337                 param = lustre_cfg_string(cfg, 1);
338                 if (param == NULL) {
339                         CERROR("param is empty\n");
340                         rc = -EINVAL;
341                         break;
342                 }
343
344                 ptr = class_find_old_param(param, ofd_interop_param);
345                 if (ptr != NULL) {
346                         if (ptr->new_param == NULL) {
347                                 rc = 0;
348                                 CWARN("For interoperability, skip this %s."
349                                       " It is obsolete.\n", ptr->old_param);
350                                 break;
351                         }
352
353                         CWARN("Found old param %s, changed it to %s.\n",
354                               ptr->old_param, ptr->new_param);
355
356                         old_cfg = cfg;
357                         cfg = lustre_cfg_rename(old_cfg, ptr->new_param);
358                         if (IS_ERR(cfg)) {
359                                 rc = PTR_ERR(cfg);
360                                 break;
361                         }
362                 }
363
364                 if (match_symlink_param(param)) {
365                         rc = next->ld_ops->ldo_process_config(env, next, cfg);
366                         break;
367                 }
368
369                 rc = class_process_proc_param(PARAM_OST, obd->obd_vars, cfg,
370                                               d->ld_obd);
371                 if (rc > 0 || rc == -ENOSYS) {
372                         CDEBUG(D_CONFIG, "pass param %s down the stack.\n",
373                                param);
374                         /* we don't understand; pass it on */
375                         rc = next->ld_ops->ldo_process_config(env, next, cfg);
376                 }
377                 break;
378         }
379         case LCFG_SPTLRPC_CONF: {
380                 rc = -ENOTSUPP;
381                 break;
382         }
383         default:
384                 /* others are passed further */
385                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
386                 break;
387         }
388         RETURN(rc);
389 }
390
391 /**
392  * Implementation of lu_object_operations::loo_object_init for OFD
393  *
394  * Allocate just the next object (OSD) in stack.
395  *
396  * \param[in] env       execution environment
397  * \param[in] o         lu_object of OFD object
398  * \param[in] conf      additional configuration parameters, not used here
399  *
400  * \retval              0 if successful
401  * \retval              negative value on error
402  */
403 static int ofd_object_init(const struct lu_env *env, struct lu_object *o,
404                            const struct lu_object_conf *conf)
405 {
406         struct ofd_device       *d = ofd_dev(o->lo_dev);
407         struct lu_device        *under;
408         struct lu_object        *below;
409         int                      rc = 0;
410
411         ENTRY;
412
413         CDEBUG(D_INFO, "object init, fid = "DFID"\n",
414                PFID(lu_object_fid(o)));
415
416         under = &d->ofd_osd->dd_lu_dev;
417         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
418         if (below != NULL)
419                 lu_object_add(o, below);
420         else
421                 rc = -ENOMEM;
422
423         RETURN(rc);
424 }
425
426 /**
427  * Implementation of lu_object_operations::loo_object_free.
428  *
429  * Finish OFD object lifecycle and free its memory.
430  *
431  * \param[in] env       execution environment
432  * \param[in] o         LU object of OFD object
433  */
434 static void ofd_object_free(const struct lu_env *env, struct lu_object *o)
435 {
436         struct ofd_object       *of = ofd_obj(o);
437         struct lu_object_header *h;
438
439         ENTRY;
440
441         h = o->lo_header;
442         CDEBUG(D_INFO, "object free, fid = "DFID"\n",
443                PFID(lu_object_fid(o)));
444
445         lu_object_fini(o);
446         lu_object_header_fini(h);
447         OBD_SLAB_FREE_PTR(of, ofd_object_kmem);
448         EXIT;
449 }
450
451 /**
452  * Implementation of lu_object_operations::loo_object_print.
453  *
454  * Print OFD part of compound OFD-OSD object. See lu_object_print() and
455  * LU_OBJECT_DEBUG() for more details about the compound object printing.
456  *
457  * \param[in] env       execution environment
458  * \param[in] cookie    opaque data passed to the printer function
459  * \param[in] p         printer function to use
460  * \param[in] o         LU object of OFD object
461  *
462  * \retval              0 if successful
463  * \retval              negative value on error
464  */
465 static int ofd_object_print(const struct lu_env *env, void *cookie,
466                             lu_printer_t p, const struct lu_object *o)
467 {
468         return (*p)(env, cookie, LUSTRE_OST_NAME"-object@%p", o);
469 }
470
471 static struct lu_object_operations ofd_obj_ops = {
472         .loo_object_init        = ofd_object_init,
473         .loo_object_free        = ofd_object_free,
474         .loo_object_print       = ofd_object_print
475 };
476
477 /**
478  * Implementation of lu_device_operations::lod_object_alloc.
479  *
480  * This function allocates OFD part of compound OFD-OSD object and
481  * initializes its header, because OFD is the top device in stack
482  *
483  * \param[in] env       execution environment
484  * \param[in] hdr       object header, NULL for OFD
485  * \param[in] d         lu_device
486  *
487  * \retval              allocated object if successful
488  * \retval              NULL value on failed allocation
489  */
490 static struct lu_object *ofd_object_alloc(const struct lu_env *env,
491                                           const struct lu_object_header *hdr,
492                                           struct lu_device *d)
493 {
494         struct ofd_object *of;
495
496         ENTRY;
497
498         OBD_SLAB_ALLOC_PTR_GFP(of, ofd_object_kmem, GFP_NOFS);
499         if (of != NULL) {
500                 struct lu_object        *o;
501                 struct lu_object_header *h;
502
503                 o = &of->ofo_obj.do_lu;
504                 h = &of->ofo_header;
505                 lu_object_header_init(h);
506                 lu_object_init(o, h, d);
507                 lu_object_add_top(h, o);
508                 o->lo_ops = &ofd_obj_ops;
509                 RETURN(o);
510         } else {
511                 RETURN(NULL);
512         }
513 }
514
515 /**
516  * Return the result of LFSCK run to the OFD.
517  *
518  * Notify OFD about result of LFSCK run. That may block the new object
519  * creation until problem is fixed by LFSCK.
520  *
521  * \param[in] env       execution environment
522  * \param[in] data      pointer to the OFD device
523  * \param[in] event     LFSCK event type
524  *
525  * \retval              0 if successful
526  * \retval              negative value on unknown event
527  */
528 static int ofd_lfsck_out_notify(const struct lu_env *env, void *data,
529                                 enum lfsck_events event)
530 {
531         struct ofd_device *ofd = data;
532         struct obd_device *obd = ofd_obd(ofd);
533
534         switch (event) {
535         case LE_LASTID_REBUILDING:
536                 CWARN("%s: Found crashed LAST_ID, deny creating new OST-object "
537                       "on the device until the LAST_ID rebuilt successfully.\n",
538                       obd->obd_name);
539                 down_write(&ofd->ofd_lastid_rwsem);
540                 ofd->ofd_lastid_rebuilding = 1;
541                 up_write(&ofd->ofd_lastid_rwsem);
542                 break;
543         case LE_LASTID_REBUILT: {
544                 down_write(&ofd->ofd_lastid_rwsem);
545                 ofd_seqs_free(env, ofd);
546                 ofd->ofd_lastid_rebuilding = 0;
547                 ofd->ofd_lastid_gen++;
548                 up_write(&ofd->ofd_lastid_rwsem);
549                 CWARN("%s: Rebuilt crashed LAST_ID files successfully.\n",
550                       obd->obd_name);
551                 break;
552         }
553         default:
554                 CERROR("%s: unknown lfsck event: rc = %d\n",
555                        ofd_name(ofd), event);
556                 return -EINVAL;
557         }
558
559         return 0;
560 }
561
562 /**
563  * Implementation of lu_device_operations::ldo_prepare.
564  *
565  * This method is called after layer has been initialized and before it starts
566  * serving user requests. In OFD it starts lfsk check routines and initializes
567  * recovery.
568  *
569  * \param[in] env       execution environment
570  * \param[in] pdev      higher device in stack, NULL for OFD
571  * \param[in] dev       lu_device of OFD device
572  *
573  * \retval              0 if successful
574  * \retval              negative value on error
575  */
576 static int ofd_prepare(const struct lu_env *env, struct lu_device *pdev,
577                        struct lu_device *dev)
578 {
579         struct ofd_thread_info          *info;
580         struct ofd_device               *ofd = ofd_dev(dev);
581         struct obd_device               *obd = ofd_obd(ofd);
582         struct lu_device                *next = &ofd->ofd_osd->dd_lu_dev;
583         int                              rc;
584
585         ENTRY;
586
587         info = ofd_info_init(env, NULL);
588         if (info == NULL)
589                 RETURN(-EFAULT);
590
591         /* initialize lower device */
592         rc = next->ld_ops->ldo_prepare(env, dev, next);
593         if (rc != 0)
594                 RETURN(rc);
595
596         rc = lfsck_register(env, ofd->ofd_osd, ofd->ofd_osd, obd,
597                             ofd_lfsck_out_notify, ofd, false);
598         if (rc != 0) {
599                 CERROR("%s: failed to initialize lfsck: rc = %d\n",
600                        obd->obd_name, rc);
601                 RETURN(rc);
602         }
603
604         rc = lfsck_register_namespace(env, ofd->ofd_osd, ofd->ofd_namespace);
605         /* The LFSCK instance is registered just now, so it must be there when
606          * register the namespace to such instance. */
607         LASSERTF(rc == 0, "register namespace failed: rc = %d\n", rc);
608
609         target_recovery_init(&ofd->ofd_lut, tgt_request_handle);
610         LASSERT(obd->obd_no_conn);
611         spin_lock(&obd->obd_dev_lock);
612         obd->obd_no_conn = 0;
613         spin_unlock(&obd->obd_dev_lock);
614
615         if (obd->obd_recovering == 0)
616                 ofd_postrecov(env, ofd);
617
618         RETURN(rc);
619 }
620
621 /**
622  * Implementation of lu_device_operations::ldo_recovery_complete.
623  *
624  * This method notifies all layers about 'recovery complete' event. That means
625  * device is in full state and consistent. An OFD calculates available grant
626  * space upon this event.
627  *
628  * \param[in] env       execution environment
629  * \param[in] dev       lu_device of OFD device
630  *
631  * \retval              0 if successful
632  * \retval              negative value on error
633  */
634 static int ofd_recovery_complete(const struct lu_env *env,
635                                  struct lu_device *dev)
636 {
637         struct ofd_device       *ofd = ofd_dev(dev);
638         struct lu_device        *next = &ofd->ofd_osd->dd_lu_dev;
639         int                      rc = 0, max_precreate;
640
641         ENTRY;
642
643         /*
644          * Grant space for object precreation on the self export.
645          * This initial reserved space (i.e. 10MB for zfs and 280KB for ldiskfs)
646          * is enough to create 10k objects. More space is then acquired for
647          * precreation in ofd_grant_create().
648          */
649         max_precreate = OST_MAX_PRECREATE * ofd->ofd_dt_conf.ddp_inodespace / 2;
650         ofd_grant_connect(env, dev->ld_obd->obd_self_export, max_precreate,
651                           false);
652         rc = next->ld_ops->ldo_recovery_complete(env, next);
653         RETURN(rc);
654 }
655
656 /**
657  * lu_device_operations matrix for OFD device.
658  */
659 static struct lu_device_operations ofd_lu_ops = {
660         .ldo_object_alloc       = ofd_object_alloc,
661         .ldo_process_config     = ofd_process_config,
662         .ldo_recovery_complete  = ofd_recovery_complete,
663         .ldo_prepare            = ofd_prepare,
664 };
665
666 LPROC_SEQ_FOPS(lprocfs_nid_stats_clear);
667
668 /**
669  * Initialize all needed procfs entries for OFD device.
670  *
671  * \param[in] ofd       OFD device
672  *
673  * \retval              0 if successful
674  * \retval              negative value on error
675  */
676 static int ofd_procfs_init(struct ofd_device *ofd)
677 {
678         struct obd_device               *obd = ofd_obd(ofd);
679         struct proc_dir_entry           *entry;
680         int                              rc = 0;
681
682         ENTRY;
683
684         /* lprocfs must be setup before the ofd so state can be safely added
685          * to /proc incrementally as the ofd is setup */
686         obd->obd_vars = lprocfs_ofd_obd_vars;
687         rc = lprocfs_obd_setup(obd);
688         if (rc) {
689                 CERROR("%s: lprocfs_obd_setup failed: %d.\n",
690                        obd->obd_name, rc);
691                 RETURN(rc);
692         }
693
694         rc = lprocfs_alloc_obd_stats(obd, LPROC_OFD_STATS_LAST);
695         if (rc) {
696                 CERROR("%s: lprocfs_alloc_obd_stats failed: %d.\n",
697                        obd->obd_name, rc);
698                 GOTO(obd_cleanup, rc);
699         }
700
701         obd->obd_uses_nid_stats = 1;
702
703         entry = lprocfs_register("exports", obd->obd_proc_entry, NULL, NULL);
704         if (IS_ERR(entry)) {
705                 rc = PTR_ERR(entry);
706                 CERROR("%s: error %d setting up lprocfs for %s\n",
707                        obd->obd_name, rc, "exports");
708                 GOTO(obd_cleanup, rc);
709         }
710         obd->obd_proc_exports_entry = entry;
711
712         entry = lprocfs_add_simple(obd->obd_proc_exports_entry, "clear",
713                                    obd, &lprocfs_nid_stats_clear_fops);
714         if (IS_ERR(entry)) {
715                 rc = PTR_ERR(entry);
716                 CERROR("%s: add proc entry 'clear' failed: %d.\n",
717                        obd->obd_name, rc);
718                 GOTO(obd_cleanup, rc);
719         }
720
721         ofd_stats_counter_init(obd->obd_stats);
722
723         rc = lprocfs_job_stats_init(obd, LPROC_OFD_STATS_LAST,
724                                     ofd_stats_counter_init);
725         if (rc)
726                 GOTO(obd_cleanup, rc);
727         RETURN(0);
728 obd_cleanup:
729         lprocfs_obd_cleanup(obd);
730         lprocfs_free_obd_stats(obd);
731
732         return rc;
733 }
734
735 /**
736  * Expose OSD statistics to OFD layer.
737  *
738  * The osd interfaces to the backend file system exposes useful data
739  * such as brw_stats and read or write cache states. This same data
740  * needs to be exposed into the obdfilter (ofd) layer to maintain
741  * backwards compatibility. This function creates the symlinks in the
742  * proc layer to enable this.
743  *
744  * \param[in] ofd       OFD device
745  */
746 static void ofd_procfs_add_brw_stats_symlink(struct ofd_device *ofd)
747 {
748         struct obd_device       *obd = ofd_obd(ofd);
749         struct obd_device       *osd_obd = ofd->ofd_osd_exp->exp_obd;
750
751         if (obd->obd_proc_entry == NULL)
752                 return;
753
754         lprocfs_add_symlink("brw_stats", obd->obd_proc_entry,
755                             "../../%s/%s/brw_stats",
756                             osd_obd->obd_type->typ_name, obd->obd_name);
757
758         lprocfs_add_symlink("read_cache_enable", obd->obd_proc_entry,
759                             "../../%s/%s/read_cache_enable",
760                             osd_obd->obd_type->typ_name, obd->obd_name);
761
762         lprocfs_add_symlink("readcache_max_filesize",
763                             obd->obd_proc_entry,
764                             "../../%s/%s/readcache_max_filesize",
765                             osd_obd->obd_type->typ_name, obd->obd_name);
766
767         lprocfs_add_symlink("writethrough_cache_enable",
768                             obd->obd_proc_entry,
769                             "../../%s/%s/writethrough_cache_enable",
770                             osd_obd->obd_type->typ_name, obd->obd_name);
771 }
772
773 /**
774  * Cleanup all procfs entries in OFD.
775  *
776  * \param[in] ofd       OFD device
777  */
778 static void ofd_procfs_fini(struct ofd_device *ofd)
779 {
780         struct obd_device *obd = ofd_obd(ofd);
781
782         lprocfs_free_per_client_stats(obd);
783         lprocfs_obd_cleanup(obd);
784         lprocfs_free_obd_stats(obd);
785         lprocfs_job_stats_fini(obd);
786 }
787
788 /**
789  * Stop SEQ/FID server on OFD.
790  *
791  * \param[in] env       execution environment
792  * \param[in] ofd       OFD device
793  *
794  * \retval              0 if successful
795  * \retval              negative value on error
796  */
797 int ofd_fid_fini(const struct lu_env *env, struct ofd_device *ofd)
798 {
799         return seq_site_fini(env, &ofd->ofd_seq_site);
800 }
801
802 /**
803  * Start SEQ/FID server on OFD.
804  *
805  * The SEQ/FID server on OFD is needed to allocate FIDs for new objects.
806  * It also connects to the master server to get own FID sequence (SEQ) range
807  * to this particular OFD. Typically that happens when the OST is first
808  * formatted or in the rare case that it exhausts the local sequence range.
809  *
810  * The sequence range is allocated out to the MDTs for OST object allocations,
811  * and not directly to the clients.
812  *
813  * \param[in] env       execution environment
814  * \param[in] ofd       OFD device
815  *
816  * \retval              0 if successful
817  * \retval              negative value on error
818  */
819 int ofd_fid_init(const struct lu_env *env, struct ofd_device *ofd)
820 {
821         struct seq_server_site  *ss = &ofd->ofd_seq_site;
822         struct lu_device        *lu = &ofd->ofd_dt_dev.dd_lu_dev;
823         char                    *obd_name = ofd_name(ofd);
824         char                    *name = NULL;
825         int                     rc = 0;
826
827         ss = &ofd->ofd_seq_site;
828         lu->ld_site->ld_seq_site = ss;
829         ss->ss_lu = lu->ld_site;
830         ss->ss_node_id = ofd->ofd_lut.lut_lsd.lsd_osd_index;
831
832         OBD_ALLOC_PTR(ss->ss_server_seq);
833         if (ss->ss_server_seq == NULL)
834                 GOTO(out_free, rc = -ENOMEM);
835
836         OBD_ALLOC(name, strlen(obd_name) + 10);
837         if (!name) {
838                 OBD_FREE_PTR(ss->ss_server_seq);
839                 ss->ss_server_seq = NULL;
840                 GOTO(out_free, rc = -ENOMEM);
841         }
842
843         rc = seq_server_init(env, ss->ss_server_seq, ofd->ofd_osd, obd_name,
844                              LUSTRE_SEQ_SERVER, ss);
845         if (rc) {
846                 CERROR("%s : seq server init error %d\n", obd_name, rc);
847                 GOTO(out_free, rc);
848         }
849         ss->ss_server_seq->lss_space.lsr_index = ss->ss_node_id;
850
851         OBD_ALLOC_PTR(ss->ss_client_seq);
852         if (ss->ss_client_seq == NULL)
853                 GOTO(out_free, rc = -ENOMEM);
854
855         snprintf(name, strlen(obd_name) + 6, "%p-super", obd_name);
856         rc = seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_DATA,
857                              name, NULL);
858         if (rc) {
859                 CERROR("%s : seq client init error %d\n", obd_name, rc);
860                 GOTO(out_free, rc);
861         }
862         OBD_FREE(name, strlen(obd_name) + 10);
863         name = NULL;
864
865         rc = seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq);
866
867 out_free:
868         if (rc) {
869                 if (ss->ss_server_seq) {
870                         seq_server_fini(ss->ss_server_seq, env);
871                         OBD_FREE_PTR(ss->ss_server_seq);
872                         ss->ss_server_seq = NULL;
873                 }
874
875                 if (ss->ss_client_seq) {
876                         seq_client_fini(ss->ss_client_seq);
877                         OBD_FREE_PTR(ss->ss_client_seq);
878                         ss->ss_client_seq = NULL;
879                 }
880
881                 if (name) {
882                         OBD_FREE(name, strlen(obd_name) + 10);
883                         name = NULL;
884                 }
885         }
886
887         return rc;
888 }
889
890 /**
891  * OFD request handler for OST_SET_INFO RPC.
892  *
893  * This is OFD-specific part of request handling
894  *
895  * \param[in] tsi       target session environment for this request
896  *
897  * \retval              0 if successful
898  * \retval              negative value on error
899  */
900 static int ofd_set_info_hdl(struct tgt_session_info *tsi)
901 {
902         struct ptlrpc_request   *req = tgt_ses_req(tsi);
903         struct ost_body         *body = NULL, *repbody;
904         void                    *key, *val = NULL;
905         int                      keylen, vallen, rc = 0;
906         bool                     is_grant_shrink;
907
908         ENTRY;
909
910         key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
911         if (key == NULL) {
912                 DEBUG_REQ(D_HA, req, "no set_info key");
913                 RETURN(err_serious(-EFAULT));
914         }
915         keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
916                                       RCL_CLIENT);
917
918         val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
919         if (val == NULL) {
920                 DEBUG_REQ(D_HA, req, "no set_info val");
921                 RETURN(err_serious(-EFAULT));
922         }
923         vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
924                                       RCL_CLIENT);
925
926         is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK);
927         if (is_grant_shrink)
928                 /* In this case the value is actually an RMF_OST_BODY, so we
929                  * transmutate the type of this PTLRPC */
930                 req_capsule_extend(tsi->tsi_pill, &RQF_OST_SET_GRANT_INFO);
931
932         rc = req_capsule_server_pack(tsi->tsi_pill);
933         if (rc < 0)
934                 RETURN(rc);
935
936         if (is_grant_shrink) {
937                 body = req_capsule_client_get(tsi->tsi_pill, &RMF_OST_BODY);
938
939                 repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
940                 *repbody = *body;
941
942                 /** handle grant shrink, similar to a read request */
943                 ofd_grant_prepare_read(tsi->tsi_env, tsi->tsi_exp,
944                                        &repbody->oa);
945         } else if (KEY_IS(KEY_EVICT_BY_NID)) {
946                 if (vallen > 0)
947                         obd_export_evict_by_nid(tsi->tsi_exp->exp_obd, val);
948                 rc = 0;
949         } else if (KEY_IS(KEY_SPTLRPC_CONF)) {
950                 rc = tgt_adapt_sptlrpc_conf(tsi->tsi_tgt, 0);
951         } else {
952                 CERROR("%s: Unsupported key %s\n",
953                        tgt_name(tsi->tsi_tgt), (char *)key);
954                 rc = -EOPNOTSUPP;
955         }
956         ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_SET_INFO,
957                          tsi->tsi_jobid, 1);
958
959         RETURN(rc);
960 }
961
962 /**
963  * Get FIEMAP (FIle Extent MAPping) for object with the given FID.
964  *
965  * This function returns a list of extents which describes how a file's
966  * blocks are laid out on the disk.
967  *
968  * \param[in] env       execution environment
969  * \param[in] ofd       OFD device
970  * \param[in] fid       FID of object
971  * \param[in] fiemap    fiemap structure to fill with data
972  *
973  * \retval              0 if \a fiemap is filled with data successfully
974  * \retval              negative value on error
975  */
976 int ofd_fiemap_get(const struct lu_env *env, struct ofd_device *ofd,
977                    struct lu_fid *fid, struct fiemap *fiemap)
978 {
979         struct ofd_object       *fo;
980         int                      rc;
981
982         fo = ofd_object_find(env, ofd, fid);
983         if (IS_ERR(fo)) {
984                 CERROR("%s: error finding object "DFID"\n",
985                        ofd_name(ofd), PFID(fid));
986                 return PTR_ERR(fo);
987         }
988
989         ofd_read_lock(env, fo);
990         if (ofd_object_exists(fo))
991                 rc = dt_fiemap_get(env, ofd_object_child(fo), fiemap);
992         else
993                 rc = -ENOENT;
994         ofd_read_unlock(env, fo);
995         ofd_object_put(env, fo);
996         return rc;
997 }
998
999 struct locked_region {
1000         struct list_head        list;
1001         struct lustre_handle    lh;
1002 };
1003
1004 /**
1005  * Lock single extent and save lock handle in the list.
1006  *
1007  * This is supplemental function for lock_zero_regions(). It allocates
1008  * new locked_region structure and locks it with extent lock, then adds
1009  * it to the list of all such regions.
1010  *
1011  * \param[in] ns        LDLM namespace
1012  * \param[in] res_id    resource ID
1013  * \param[in] begin     start of region
1014  * \param[in] end       end of region
1015  * \param[in] locked    list head of regions list
1016  *
1017  * \retval              0 if successful locking
1018  * \retval              negative value on error
1019  */
1020 static int lock_region(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
1021                        unsigned long long begin, unsigned long long end,
1022                        struct list_head *locked)
1023 {
1024         struct locked_region    *region = NULL;
1025         __u64                    flags = 0;
1026         int                      rc;
1027
1028         LASSERT(begin <= end);
1029         OBD_ALLOC_PTR(region);
1030         if (region == NULL)
1031                 return -ENOMEM;
1032
1033         rc = tgt_extent_lock(ns, res_id, begin, end, &region->lh,
1034                              LCK_PR, &flags);
1035         if (rc != 0)
1036                 return rc;
1037
1038         CDEBUG(D_OTHER, "ost lock [%llu,%llu], lh=%p\n", begin, end,
1039                &region->lh);
1040         list_add(&region->list, locked);
1041
1042         return 0;
1043 }
1044
1045 /**
1046  * Lock the sparse areas of given resource.
1047  *
1048  * The locking of sparse areas will cause dirty data to be flushed back from
1049  * clients. This is used when getting the FIEMAP of an object to make sure
1050  * there is no unaccounted cached data on clients.
1051  *
1052  * This function goes through \a fiemap list of extents and locks only sparse
1053  * areas between extents.
1054  *
1055  * \param[in] ns        LDLM namespace
1056  * \param[in] res_id    resource ID
1057  * \param[in] fiemap    file extents mapping on disk
1058  * \param[in] locked    list head of regions list
1059  *
1060  * \retval              0 if successful
1061  * \retval              negative value on error
1062  */
1063 static int lock_zero_regions(struct ldlm_namespace *ns,
1064                              struct ldlm_res_id *res_id,
1065                              struct fiemap *fiemap,
1066                              struct list_head *locked)
1067 {
1068         __u64 begin = fiemap->fm_start;
1069         unsigned int i;
1070         int rc = 0;
1071         struct fiemap_extent *fiemap_start = fiemap->fm_extents;
1072
1073         ENTRY;
1074
1075         CDEBUG(D_OTHER, "extents count %u\n", fiemap->fm_mapped_extents);
1076         for (i = 0; i < fiemap->fm_mapped_extents; i++) {
1077                 if (fiemap_start[i].fe_logical > begin) {
1078                         CDEBUG(D_OTHER, "ost lock [%llu,%llu]\n",
1079                                begin, fiemap_start[i].fe_logical);
1080                         rc = lock_region(ns, res_id, begin,
1081                                          fiemap_start[i].fe_logical, locked);
1082                         if (rc)
1083                                 RETURN(rc);
1084                 }
1085
1086                 begin = fiemap_start[i].fe_logical + fiemap_start[i].fe_length;
1087         }
1088
1089         if (begin < (fiemap->fm_start + fiemap->fm_length)) {
1090                 CDEBUG(D_OTHER, "ost lock [%llu,%llu]\n",
1091                        begin, fiemap->fm_start + fiemap->fm_length);
1092                 rc = lock_region(ns, res_id, begin,
1093                                  fiemap->fm_start + fiemap->fm_length, locked);
1094         }
1095
1096         RETURN(rc);
1097 }
1098
1099 /**
1100  * Unlock all previously locked sparse areas for given resource.
1101  *
1102  * This function goes through list of locked regions, unlocking and freeing
1103  * them one-by-one.
1104  *
1105  * \param[in] ns        LDLM namespace
1106  * \param[in] locked    list head of regions list
1107  */
1108 static void
1109 unlock_zero_regions(struct ldlm_namespace *ns, struct list_head *locked)
1110 {
1111         struct locked_region *entry, *temp;
1112
1113         list_for_each_entry_safe(entry, temp, locked, list) {
1114                 CDEBUG(D_OTHER, "ost unlock lh=%p\n", &entry->lh);
1115                 tgt_extent_unlock(&entry->lh, LCK_PR);
1116                 list_del(&entry->list);
1117                 OBD_FREE_PTR(entry);
1118         }
1119 }
1120
1121 /**
1122  * OFD request handler for OST_GET_INFO RPC.
1123  *
1124  * This is OFD-specific part of request handling. The OFD-specific keys are:
1125  * - KEY_LAST_ID (obsolete)
1126  * - KEY_FIEMAP
1127  * - KEY_LAST_FID
1128  *
1129  * This function reads needed data from storage and fills reply with it.
1130  *
1131  * Note: the KEY_LAST_ID is obsolete, replaced by KEY_LAST_FID on newer MDTs,
1132  * and is kept for compatibility.
1133  *
1134  * \param[in] tsi       target session environment for this request
1135  *
1136  * \retval              0 if successful
1137  * \retval              negative value on error
1138  */
1139 static int ofd_get_info_hdl(struct tgt_session_info *tsi)
1140 {
1141         struct obd_export               *exp = tsi->tsi_exp;
1142         struct ofd_device               *ofd = ofd_exp(exp);
1143         struct ofd_thread_info          *fti = tsi2ofd_info(tsi);
1144         void                            *key;
1145         int                              keylen;
1146         int                              replylen, rc = 0;
1147
1148         ENTRY;
1149
1150         /* this common part for get_info rpc */
1151         key = req_capsule_client_get(tsi->tsi_pill, &RMF_GETINFO_KEY);
1152         if (key == NULL) {
1153                 DEBUG_REQ(D_HA, tgt_ses_req(tsi), "no get_info key");
1154                 RETURN(err_serious(-EPROTO));
1155         }
1156         keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_GETINFO_KEY,
1157                                       RCL_CLIENT);
1158
1159         if (KEY_IS(KEY_LAST_ID)) {
1160                 u64             *last_id;
1161                 struct ofd_seq  *oseq;
1162
1163                 req_capsule_extend(tsi->tsi_pill, &RQF_OST_GET_INFO_LAST_ID);
1164                 rc = req_capsule_server_pack(tsi->tsi_pill);
1165                 if (rc)
1166                         RETURN(err_serious(rc));
1167
1168                 last_id = req_capsule_server_get(tsi->tsi_pill, &RMF_OBD_ID);
1169
1170                 oseq = ofd_seq_load(tsi->tsi_env, ofd,
1171                                     (u64)exp->exp_filter_data.fed_group);
1172                 if (IS_ERR(oseq))
1173                         rc = -EFAULT;
1174                 else
1175                         *last_id = ofd_seq_last_oid(oseq);
1176                 ofd_seq_put(tsi->tsi_env, oseq);
1177         } else if (KEY_IS(KEY_FIEMAP)) {
1178                 struct ll_fiemap_info_key       *fm_key;
1179                 struct fiemap                   *fiemap;
1180                 struct lu_fid                   *fid;
1181
1182                 req_capsule_extend(tsi->tsi_pill, &RQF_OST_GET_INFO_FIEMAP);
1183
1184                 fm_key = req_capsule_client_get(tsi->tsi_pill, &RMF_FIEMAP_KEY);
1185                 rc = tgt_validate_obdo(tsi, &fm_key->lfik_oa);
1186                 if (rc)
1187                         RETURN(err_serious(rc));
1188
1189                 fid = &fm_key->lfik_oa.o_oi.oi_fid;
1190
1191                 CDEBUG(D_INODE, "get FIEMAP of object "DFID"\n", PFID(fid));
1192
1193                 replylen = fiemap_count_to_size(
1194                                         fm_key->lfik_fiemap.fm_extent_count);
1195                 req_capsule_set_size(tsi->tsi_pill, &RMF_FIEMAP_VAL,
1196                                      RCL_SERVER, replylen);
1197
1198                 rc = req_capsule_server_pack(tsi->tsi_pill);
1199                 if (rc)
1200                         RETURN(err_serious(rc));
1201
1202                 fiemap = req_capsule_server_get(tsi->tsi_pill, &RMF_FIEMAP_VAL);
1203                 if (fiemap == NULL)
1204                         RETURN(-ENOMEM);
1205
1206                 *fiemap = fm_key->lfik_fiemap;
1207                 rc = ofd_fiemap_get(tsi->tsi_env, ofd, fid, fiemap);
1208
1209                 /* LU-3219: Lock the sparse areas to make sure dirty
1210                  * flushed back from client, then call fiemap again. */
1211                 if (fm_key->lfik_oa.o_valid & OBD_MD_FLFLAGS &&
1212                     fm_key->lfik_oa.o_flags & OBD_FL_SRVLOCK) {
1213                         struct list_head locked;
1214
1215                         INIT_LIST_HEAD(&locked);
1216                         ost_fid_build_resid(fid, &fti->fti_resid);
1217                         rc = lock_zero_regions(ofd->ofd_namespace,
1218                                                &fti->fti_resid, fiemap,
1219                                                &locked);
1220                         if (rc == 0 && !list_empty(&locked)) {
1221                                 rc = ofd_fiemap_get(tsi->tsi_env, ofd, fid,
1222                                                     fiemap);
1223                                 unlock_zero_regions(ofd->ofd_namespace,
1224                                                     &locked);
1225                         }
1226                 }
1227         } else if (KEY_IS(KEY_LAST_FID)) {
1228                 struct ofd_device       *ofd = ofd_exp(exp);
1229                 struct ofd_seq          *oseq;
1230                 struct lu_fid           *fid;
1231                 int                      rc;
1232
1233                 req_capsule_extend(tsi->tsi_pill, &RQF_OST_GET_INFO_LAST_FID);
1234                 rc = req_capsule_server_pack(tsi->tsi_pill);
1235                 if (rc)
1236                         RETURN(err_serious(rc));
1237
1238                 fid = req_capsule_client_get(tsi->tsi_pill, &RMF_FID);
1239                 if (fid == NULL)
1240                         RETURN(err_serious(-EPROTO));
1241
1242                 fid_le_to_cpu(&fti->fti_ostid.oi_fid, fid);
1243
1244                 fid = req_capsule_server_get(tsi->tsi_pill, &RMF_FID);
1245                 if (fid == NULL)
1246                         RETURN(-ENOMEM);
1247
1248                 oseq = ofd_seq_load(tsi->tsi_env, ofd,
1249                                     ostid_seq(&fti->fti_ostid));
1250                 if (IS_ERR(oseq))
1251                         RETURN(PTR_ERR(oseq));
1252
1253                 rc = ostid_to_fid(fid, &oseq->os_oi,
1254                                   ofd->ofd_lut.lut_lsd.lsd_osd_index);
1255                 if (rc != 0)
1256                         GOTO(out_put, rc);
1257
1258                 CDEBUG(D_HA, "%s: LAST FID is "DFID"\n", ofd_name(ofd),
1259                        PFID(fid));
1260 out_put:
1261                 ofd_seq_put(tsi->tsi_env, oseq);
1262         } else {
1263                 CERROR("%s: not supported key %s\n", tgt_name(tsi->tsi_tgt),
1264                        (char *)key);
1265                 rc = -EOPNOTSUPP;
1266         }
1267         ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_GET_INFO,
1268                          tsi->tsi_jobid, 1);
1269
1270         RETURN(rc);
1271 }
1272
1273 /**
1274  * OFD request handler for OST_GETATTR RPC.
1275  *
1276  * This is OFD-specific part of request handling. It finds the OFD object
1277  * by its FID, gets attributes from storage and packs result to the reply.
1278  *
1279  * \param[in] tsi       target session environment for this request
1280  *
1281  * \retval              0 if successful
1282  * \retval              negative value on error
1283  */
1284 static int ofd_getattr_hdl(struct tgt_session_info *tsi)
1285 {
1286         struct ofd_thread_info  *fti = tsi2ofd_info(tsi);
1287         struct ofd_device       *ofd = ofd_exp(tsi->tsi_exp);
1288         struct ost_body         *repbody;
1289         struct lustre_handle     lh = { 0 };
1290         struct ofd_object       *fo;
1291         __u64                    flags = 0;
1292         enum ldlm_mode           lock_mode = LCK_PR;
1293         bool                     srvlock;
1294         int                      rc;
1295         ENTRY;
1296
1297         LASSERT(tsi->tsi_ost_body != NULL);
1298
1299         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
1300         if (repbody == NULL)
1301                 RETURN(-ENOMEM);
1302
1303         repbody->oa.o_oi = tsi->tsi_ost_body->oa.o_oi;
1304         repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1305
1306         srvlock = tsi->tsi_ost_body->oa.o_valid & OBD_MD_FLFLAGS &&
1307                   tsi->tsi_ost_body->oa.o_flags & OBD_FL_SRVLOCK;
1308
1309         if (srvlock) {
1310                 if (unlikely(tsi->tsi_ost_body->oa.o_flags & OBD_FL_FLUSH))
1311                         lock_mode = LCK_PW;
1312
1313                 rc = tgt_extent_lock(tsi->tsi_tgt->lut_obd->obd_namespace,
1314                                      &tsi->tsi_resid, 0, OBD_OBJECT_EOF, &lh,
1315                                      lock_mode, &flags);
1316                 if (rc != 0)
1317                         RETURN(rc);
1318         }
1319
1320         fo = ofd_object_find_exists(tsi->tsi_env, ofd, &tsi->tsi_fid);
1321         if (IS_ERR(fo))
1322                 GOTO(out, rc = PTR_ERR(fo));
1323
1324         rc = ofd_attr_get(tsi->tsi_env, fo, &fti->fti_attr);
1325         if (rc == 0) {
1326                 __u64    curr_version;
1327
1328                 obdo_from_la(&repbody->oa, &fti->fti_attr,
1329                              OFD_VALID_FLAGS | LA_UID | LA_GID);
1330                 tgt_drop_id(tsi->tsi_exp, &repbody->oa);
1331
1332                 /* Store object version in reply */
1333                 curr_version = dt_version_get(tsi->tsi_env,
1334                                               ofd_object_child(fo));
1335                 if ((__s64)curr_version != -EOPNOTSUPP) {
1336                         repbody->oa.o_valid |= OBD_MD_FLDATAVERSION;
1337                         repbody->oa.o_data_version = curr_version;
1338                 }
1339         }
1340
1341         ofd_object_put(tsi->tsi_env, fo);
1342 out:
1343         if (srvlock)
1344                 tgt_extent_unlock(&lh, lock_mode);
1345
1346         ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_GETATTR,
1347                          tsi->tsi_jobid, 1);
1348
1349         repbody->oa.o_valid |= OBD_MD_FLFLAGS;
1350         repbody->oa.o_flags = OBD_FL_FLUSH;
1351
1352         RETURN(rc);
1353 }
1354
1355 /**
1356  * OFD request handler for OST_SETATTR RPC.
1357  *
1358  * This is OFD-specific part of request handling. It finds the OFD object
1359  * by its FID, sets attributes from request and packs result to the reply.
1360  *
1361  * \param[in] tsi       target session environment for this request
1362  *
1363  * \retval              0 if successful
1364  * \retval              negative value on error
1365  */
1366 static int ofd_setattr_hdl(struct tgt_session_info *tsi)
1367 {
1368         struct ofd_thread_info  *fti = tsi2ofd_info(tsi);
1369         struct ofd_device       *ofd = ofd_exp(tsi->tsi_exp);
1370         struct ost_body         *body = tsi->tsi_ost_body;
1371         struct ost_body         *repbody;
1372         struct ldlm_resource    *res;
1373         struct ofd_object       *fo;
1374         struct filter_fid       *ff = NULL;
1375         int                      rc = 0;
1376
1377         ENTRY;
1378
1379         LASSERT(body != NULL);
1380
1381         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
1382         if (repbody == NULL)
1383                 RETURN(-ENOMEM);
1384
1385         repbody->oa.o_oi = body->oa.o_oi;
1386         repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1387
1388         /* This would be very bad - accidentally truncating a file when
1389          * changing the time or similar - bug 12203. */
1390         if (body->oa.o_valid & OBD_MD_FLSIZE &&
1391             body->oa.o_size != OBD_OBJECT_EOF) {
1392                 static char mdsinum[48];
1393
1394                 if (body->oa.o_valid & OBD_MD_FLFID)
1395                         snprintf(mdsinum, sizeof(mdsinum) - 1,
1396                                  "of parent "DFID, body->oa.o_parent_seq,
1397                                  body->oa.o_parent_oid, 0);
1398                 else
1399                         mdsinum[0] = '\0';
1400
1401                 CERROR("%s: setattr from %s is trying to truncate object "DFID
1402                        " %s\n", ofd_name(ofd), obd_export_nid2str(tsi->tsi_exp),
1403                        PFID(&tsi->tsi_fid), mdsinum);
1404                 RETURN(-EPERM);
1405         }
1406
1407         fo = ofd_object_find_exists(tsi->tsi_env, ofd, &tsi->tsi_fid);
1408         if (IS_ERR(fo))
1409                 GOTO(out, rc = PTR_ERR(fo));
1410
1411         la_from_obdo(&fti->fti_attr, &body->oa, body->oa.o_valid);
1412         fti->fti_attr.la_valid &= ~LA_TYPE;
1413
1414         if (body->oa.o_valid & OBD_MD_FLFID) {
1415                 ff = &fti->fti_mds_fid;
1416                 ofd_prepare_fidea(ff, &body->oa);
1417         }
1418
1419         /* setting objects attributes (including owner/group) */
1420         rc = ofd_attr_set(tsi->tsi_env, fo, &fti->fti_attr, ff);
1421         if (rc != 0)
1422                 GOTO(out_put, rc);
1423
1424         obdo_from_la(&repbody->oa, &fti->fti_attr,
1425                      OFD_VALID_FLAGS | LA_UID | LA_GID);
1426         tgt_drop_id(tsi->tsi_exp, &repbody->oa);
1427
1428         ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_SETATTR,
1429                          tsi->tsi_jobid, 1);
1430         EXIT;
1431 out_put:
1432         ofd_object_put(tsi->tsi_env, fo);
1433 out:
1434         if (rc == 0) {
1435                 /* we do not call this before to avoid lu_object_find() in
1436                  *  ->lvbo_update() holding another reference on the object.
1437                  * otherwise concurrent destroy can make the object unavailable
1438                  * for 2nd lu_object_find() waiting for the first reference
1439                  * to go... deadlock! */
1440                 res = ldlm_resource_get(ofd->ofd_namespace, NULL,
1441                                         &tsi->tsi_resid, LDLM_EXTENT, 0);
1442                 if (!IS_ERR(res)) {
1443                         ldlm_res_lvbo_update(res, NULL, 0);
1444                         ldlm_resource_putref(res);
1445                 }
1446         }
1447         return rc;
1448 }
1449
1450 /**
1451  * Destroy OST orphans.
1452  *
1453  * This is part of OST_CREATE RPC handling. If there is flag OBD_FL_DELORPHAN
1454  * set then we must destroy possible orphaned objects.
1455  *
1456  * \param[in] env       execution environment
1457  * \param[in] exp       OBD export
1458  * \param[in] ofd       OFD device
1459  * \param[in] oa        obdo structure for reply
1460  *
1461  * \retval              0 if successful
1462  * \retval              negative value on error
1463  */
1464 static int ofd_orphans_destroy(const struct lu_env *env,
1465                                struct obd_export *exp,
1466                                struct ofd_device *ofd, struct obdo *oa)
1467 {
1468         struct ofd_thread_info  *info   = ofd_info(env);
1469         struct lu_fid           *fid    = &info->fti_fid;
1470         struct ost_id           *oi     = &oa->o_oi;
1471         struct ofd_seq          *oseq;
1472         u64                      seq    = ostid_seq(oi);
1473         u64                      end_id = ostid_id(oi);
1474         u64                      last;
1475         u64                      oid;
1476         int                      skip_orphan;
1477         int                      rc     = 0;
1478
1479         ENTRY;
1480
1481         oseq = ofd_seq_get(ofd, seq);
1482         if (oseq == NULL) {
1483                 CERROR("%s: Can not find seq for "DOSTID"\n",
1484                        ofd_name(ofd), POSTID(oi));
1485                 RETURN(-EINVAL);
1486         }
1487
1488         *fid = oi->oi_fid;
1489         last = ofd_seq_last_oid(oseq);
1490         oid = last;
1491
1492         LASSERT(exp != NULL);
1493         skip_orphan = !!(exp_connect_flags(exp) & OBD_CONNECT_SKIP_ORPHAN);
1494
1495         if (OBD_FAIL_CHECK(OBD_FAIL_OST_NODESTROY))
1496                 goto done;
1497
1498         LCONSOLE(D_INFO, "%s: deleting orphan objects from "DOSTID
1499                  " to "DOSTID"\n", ofd_name(ofd), seq, end_id + 1, seq, last);
1500
1501         while (oid > end_id) {
1502                 rc = fid_set_id(fid, oid);
1503                 if (unlikely(rc != 0))
1504                         GOTO(out_put, rc);
1505
1506                 rc = ofd_destroy_by_fid(env, ofd, fid, 1);
1507                 if (rc != 0 && rc != -ENOENT && rc != -ESTALE &&
1508                     likely(rc != -EREMCHG && rc != -EINPROGRESS))
1509                         /* this is pretty fatal... */
1510                         CEMERG("%s: error destroying precreated id "
1511                                DFID": rc = %d\n",
1512                                ofd_name(ofd), PFID(fid), rc);
1513
1514                 oid--;
1515                 if (!skip_orphan) {
1516                         ofd_seq_last_oid_set(oseq, oid);
1517                         /* update last_id on disk periodically so that if we
1518                          * restart * we don't need to re-scan all of the just
1519                          * deleted objects. */
1520                         if ((oid & 511) == 0)
1521                                 ofd_seq_last_oid_write(env, ofd, oseq);
1522                 }
1523         }
1524
1525         CDEBUG(D_HA, "%s: after destroy: set last_id to "DOSTID"\n",
1526                ofd_name(ofd), seq, oid);
1527
1528 done:
1529         if (!skip_orphan) {
1530                 ofd_seq_last_oid_set(oseq, oid);
1531                 rc = ofd_seq_last_oid_write(env, ofd, oseq);
1532         } else {
1533                 /* don't reuse orphan object, return last used objid */
1534                 ostid_set_id(oi, last);
1535                 rc = 0;
1536         }
1537
1538         GOTO(out_put, rc);
1539
1540 out_put:
1541         ofd_seq_put(env, oseq);
1542         return rc;
1543 }
1544
1545 /**
1546  * OFD request handler for OST_CREATE RPC.
1547  *
1548  * This is OFD-specific part of request handling. Its main purpose is to
1549  * create new data objects on OST, but it also used to destroy orphans.
1550  *
1551  * \param[in] tsi       target session environment for this request
1552  *
1553  * \retval              0 if successful
1554  * \retval              negative value on error
1555  */
1556 static int ofd_create_hdl(struct tgt_session_info *tsi)
1557 {
1558         struct ptlrpc_request   *req = tgt_ses_req(tsi);
1559         struct ost_body         *repbody;
1560         const struct obdo       *oa = &tsi->tsi_ost_body->oa;
1561         struct obdo             *rep_oa;
1562         struct obd_export       *exp = tsi->tsi_exp;
1563         struct ofd_device       *ofd = ofd_exp(exp);
1564         u64                      seq = ostid_seq(&oa->o_oi);
1565         u64                      oid = ostid_id(&oa->o_oi);
1566         struct ofd_seq          *oseq;
1567         int                      rc = 0, diff;
1568         int                      sync_trans = 0;
1569         long                     granted = 0;
1570
1571         ENTRY;
1572
1573         if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
1574                 RETURN(-EROFS);
1575
1576         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
1577         if (repbody == NULL)
1578                 RETURN(-ENOMEM);
1579
1580         down_read(&ofd->ofd_lastid_rwsem);
1581         /* Currently, for safe, we do not distinguish which LAST_ID is broken,
1582          * we may do that in the future.
1583          * Return -ENOSPC until the LAST_ID rebuilt. */
1584         if (unlikely(ofd->ofd_lastid_rebuilding))
1585                 GOTO(out_sem, rc = -ENOSPC);
1586
1587         rep_oa = &repbody->oa;
1588         rep_oa->o_oi = oa->o_oi;
1589
1590         LASSERT(seq >= FID_SEQ_OST_MDT0);
1591         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1592
1593         CDEBUG(D_INFO, "ofd_create("DOSTID")\n", POSTID(&oa->o_oi));
1594
1595         oseq = ofd_seq_load(tsi->tsi_env, ofd, seq);
1596         if (IS_ERR(oseq)) {
1597                 CERROR("%s: Can't find FID Sequence "LPX64": rc = %ld\n",
1598                        ofd_name(ofd), seq, PTR_ERR(oseq));
1599                 GOTO(out_sem, rc = -EINVAL);
1600         }
1601
1602         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
1603             (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
1604                 if (!ofd_obd(ofd)->obd_recovering ||
1605                     oid > ofd_seq_last_oid(oseq)) {
1606                         CERROR("%s: recreate objid "DOSTID" > last id "LPU64
1607                                "\n", ofd_name(ofd), POSTID(&oa->o_oi),
1608                                ofd_seq_last_oid(oseq));
1609                         GOTO(out_nolock, rc = -EINVAL);
1610                 }
1611                 /* Do nothing here, we re-create objects during recovery
1612                  * upon write replay, see ofd_preprw_write() */
1613                 GOTO(out_nolock, rc = 0);
1614         }
1615         /* former ofd_handle_precreate */
1616         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
1617             (oa->o_flags & OBD_FL_DELORPHAN)) {
1618                 exp->exp_filter_data.fed_lastid_gen = ofd->ofd_lastid_gen;
1619
1620                 /* destroy orphans */
1621                 if (lustre_msg_get_conn_cnt(tgt_ses_req(tsi)->rq_reqmsg) <
1622                     exp->exp_conn_cnt) {
1623                         CERROR("%s: dropping old orphan cleanup request\n",
1624                                ofd_name(ofd));
1625                         GOTO(out_nolock, rc = 0);
1626                 }
1627                 /* This causes inflight precreates to abort and drop lock */
1628                 oseq->os_destroys_in_progress = 1;
1629                 mutex_lock(&oseq->os_create_lock);
1630                 if (!oseq->os_destroys_in_progress) {
1631                         CERROR("%s:["LPU64"] destroys_in_progress already"
1632                                " cleared\n", ofd_name(ofd), seq);
1633                         ostid_set_id(&rep_oa->o_oi, ofd_seq_last_oid(oseq));
1634                         GOTO(out, rc = 0);
1635                 }
1636                 diff = oid - ofd_seq_last_oid(oseq);
1637                 CDEBUG(D_HA, "ofd_last_id() = "LPU64" -> diff = %d\n",
1638                         ofd_seq_last_oid(oseq), diff);
1639                 if (-diff > OST_MAX_PRECREATE) {
1640                         /* Let MDS know that we are so far ahead. */
1641                         ostid_set_id(&rep_oa->o_oi, ofd_seq_last_oid(oseq) + 1);
1642                         rc = 0;
1643                 } else if (diff < 0) {
1644                         rc = ofd_orphans_destroy(tsi->tsi_env, exp,
1645                                                  ofd, rep_oa);
1646                         oseq->os_destroys_in_progress = 0;
1647                 } else {
1648                         /* XXX: Used by MDS for the first time! */
1649                         oseq->os_destroys_in_progress = 0;
1650                 }
1651         } else {
1652                 if (unlikely(exp->exp_filter_data.fed_lastid_gen !=
1653                              ofd->ofd_lastid_gen)) {
1654                         /* Keep the export ref so we can send the reply. */
1655                         ofd_obd_disconnect(class_export_get(exp));
1656                         GOTO(out_nolock, rc = -ENOTCONN);
1657                 }
1658
1659                 mutex_lock(&oseq->os_create_lock);
1660                 if (lustre_msg_get_conn_cnt(tgt_ses_req(tsi)->rq_reqmsg) <
1661                     exp->exp_conn_cnt) {
1662                         CERROR("%s: dropping old precreate request\n",
1663                                ofd_name(ofd));
1664                         GOTO(out, rc = 0);
1665                 }
1666                 /* only precreate if seq is 0, IDIF or normal and also o_id
1667                  * must be specfied */
1668                 if ((!fid_seq_is_mdt(seq) && !fid_seq_is_norm(seq) &&
1669                      !fid_seq_is_idif(seq)) || oid == 0) {
1670                         diff = 1; /* shouldn't we create this right now? */
1671                 } else {
1672                         diff = oid - ofd_seq_last_oid(oseq);
1673                         /* Do sync create if the seq is about to used up */
1674                         if (fid_seq_is_idif(seq) || fid_seq_is_mdt0(seq)) {
1675                                 if (unlikely(oid >= IDIF_MAX_OID - 1))
1676                                         sync_trans = 1;
1677                         } else if (fid_seq_is_norm(seq)) {
1678                                 if (unlikely(oid >=
1679                                              LUSTRE_DATA_SEQ_MAX_WIDTH - 1))
1680                                         sync_trans = 1;
1681                         } else {
1682                                 CERROR("%s : invalid o_seq "DOSTID"\n",
1683                                        ofd_name(ofd), POSTID(&oa->o_oi));
1684                                 GOTO(out, rc = -EINVAL);
1685                         }
1686
1687                         if (diff < 0) {
1688                                 /* LU-5648 */
1689                                 CERROR("%s: invalid precreate request for "
1690                                        DOSTID", last_id " LPU64 ". "
1691                                        "Likely MDS last_id corruption\n",
1692                                        ofd_name(ofd), POSTID(&oa->o_oi),
1693                                        ofd_seq_last_oid(oseq));
1694                                 GOTO(out, rc = -EINVAL);
1695                         }
1696                 }
1697         }
1698         if (diff > 0) {
1699                 cfs_time_t       enough_time = cfs_time_shift(DISK_TIMEOUT);
1700                 u64              next_id;
1701                 int              created = 0;
1702                 int              count;
1703
1704                 if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
1705                     !(oa->o_flags & OBD_FL_DELORPHAN)) {
1706                         /* don't enforce grant during orphan recovery */
1707                         granted = ofd_grant_create(tsi->tsi_env,
1708                                                   ofd_obd(ofd)->obd_self_export,
1709                                                    &diff);
1710                         if (granted < 0) {
1711                                 rc = granted;
1712                                 granted = 0;
1713                                 CDEBUG(D_HA, "%s: failed to acquire grant "
1714                                        "space for precreate (%d): rc = %d\n",
1715                                        ofd_name(ofd), diff, rc);
1716                                 diff = 0;
1717                         }
1718                 }
1719
1720                 /* This can happen if a new OST is formatted and installed
1721                  * in place of an old one at the same index.  Instead of
1722                  * precreating potentially millions of deleted old objects
1723                  * (possibly filling the OST), only precreate the last batch.
1724                  * LFSCK will eventually clean up any orphans. LU-14 */
1725                 if (diff > 5 * OST_MAX_PRECREATE) {
1726                         diff = OST_MAX_PRECREATE / 2;
1727                         LCONSOLE_WARN("%s: Too many FIDs to precreate "
1728                                       "OST replaced or reformatted: "
1729                                       "LFSCK will clean up",
1730                                       ofd_name(ofd));
1731
1732                         CDEBUG(D_HA, "%s: precreate FID "DOSTID" is over "
1733                                "%u larger than the LAST_ID "DOSTID", only "
1734                                "precreating the last %u objects.\n",
1735                                ofd_name(ofd), POSTID(&oa->o_oi),
1736                                5 * OST_MAX_PRECREATE,
1737                                POSTID(&oseq->os_oi), diff);
1738                         ofd_seq_last_oid_set(oseq, ostid_id(&oa->o_oi) - diff);
1739                 }
1740
1741                 while (diff > 0) {
1742                         next_id = ofd_seq_last_oid(oseq) + 1;
1743                         count = ofd_precreate_batch(ofd, diff);
1744
1745                         CDEBUG(D_HA, "%s: reserve %d objects in group "LPX64
1746                                " at "LPU64"\n", ofd_name(ofd),
1747                                count, seq, next_id);
1748
1749                         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1750                             && cfs_time_after(jiffies, enough_time)) {
1751                                 CDEBUG(D_HA, "%s: Slow creates, %d/%d objects"
1752                                       " created at a rate of %d/s\n",
1753                                       ofd_name(ofd), created, diff + created,
1754                                       created / DISK_TIMEOUT);
1755                                 break;
1756                         }
1757
1758                         rc = ofd_precreate_objects(tsi->tsi_env, ofd, next_id,
1759                                                    oseq, count, sync_trans);
1760                         if (rc > 0) {
1761                                 created += rc;
1762                                 diff -= rc;
1763                         } else if (rc < 0) {
1764                                 break;
1765                         }
1766                 }
1767
1768                 if (diff > 0 &&
1769                     lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1770                         LCONSOLE_WARN("%s: can't create the same count of"
1771                                       " objects when replaying the request"
1772                                       " (diff is %d). see LU-4621\n",
1773                                       ofd_name(ofd), diff);
1774
1775                 if (created > 0)
1776                         /* some objects got created, we can return
1777                          * them, even if last creation failed */
1778                         rc = 0;
1779                 else
1780                         CERROR("%s: unable to precreate: rc = %d\n",
1781                                ofd_name(ofd), rc);
1782
1783                 if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
1784                     !(oa->o_flags & OBD_FL_DELORPHAN)) {
1785                         ofd_grant_commit(ofd_obd(ofd)->obd_self_export, granted,
1786                                          rc);
1787                         granted = 0;
1788                 }
1789
1790                 ostid_set_id(&rep_oa->o_oi, ofd_seq_last_oid(oseq));
1791         }
1792         EXIT;
1793         ofd_counter_incr(exp, LPROC_OFD_STATS_CREATE,
1794                          tsi->tsi_jobid, 1);
1795 out:
1796         mutex_unlock(&oseq->os_create_lock);
1797 out_nolock:
1798         if (rc == 0) {
1799 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0)
1800                 struct ofd_thread_info  *info = ofd_info(tsi->tsi_env);
1801                 struct lu_fid           *fid = &info->fti_fid;
1802
1803                 /* For compatible purpose, it needs to convert back to
1804                  * OST ID before put it on wire. */
1805                 *fid = rep_oa->o_oi.oi_fid;
1806                 fid_to_ostid(fid, &rep_oa->o_oi);
1807 #endif
1808                 rep_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
1809         }
1810         ofd_seq_put(tsi->tsi_env, oseq);
1811
1812 out_sem:
1813         up_read(&ofd->ofd_lastid_rwsem);
1814         return rc;
1815 }
1816
1817 /**
1818  * OFD request handler for OST_DESTROY RPC.
1819  *
1820  * This is OFD-specific part of request handling. It destroys data objects
1821  * related to destroyed object on MDT.
1822  *
1823  * \param[in] tsi       target session environment for this request
1824  *
1825  * \retval              0 if successful
1826  * \retval              negative value on error
1827  */
1828 static int ofd_destroy_hdl(struct tgt_session_info *tsi)
1829 {
1830         const struct ost_body   *body = tsi->tsi_ost_body;
1831         struct ost_body         *repbody;
1832         struct ofd_device       *ofd = ofd_exp(tsi->tsi_exp);
1833         struct ofd_thread_info  *fti = tsi2ofd_info(tsi);
1834         struct lu_fid           *fid = &fti->fti_fid;
1835         u64                      oid;
1836         u32                      count;
1837         int                      rc = 0;
1838
1839         ENTRY;
1840
1841         if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
1842                 RETURN(-EROFS);
1843
1844         /* This is old case for clients before Lustre 2.4 */
1845         /* If there's a DLM request, cancel the locks mentioned in it */
1846         if (req_capsule_field_present(tsi->tsi_pill, &RMF_DLM_REQ,
1847                                       RCL_CLIENT)) {
1848                 struct ldlm_request *dlm;
1849
1850                 dlm = req_capsule_client_get(tsi->tsi_pill, &RMF_DLM_REQ);
1851                 if (dlm == NULL)
1852                         RETURN(-EFAULT);
1853                 ldlm_request_cancel(tgt_ses_req(tsi), dlm, 0, LATF_SKIP);
1854         }
1855
1856         *fid = body->oa.o_oi.oi_fid;
1857         oid = ostid_id(&body->oa.o_oi);
1858         LASSERT(oid != 0);
1859
1860         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
1861
1862         /* check that o_misc makes sense */
1863         if (body->oa.o_valid & OBD_MD_FLOBJCOUNT)
1864                 count = body->oa.o_misc;
1865         else
1866                 count = 1; /* default case - single destroy */
1867
1868         CDEBUG(D_HA, "%s: Destroy object "DOSTID" count %d\n", ofd_name(ofd),
1869                POSTID(&body->oa.o_oi), count);
1870
1871         while (count > 0) {
1872                 int lrc;
1873
1874                 lrc = ofd_destroy_by_fid(tsi->tsi_env, ofd, fid, 0);
1875                 if (lrc == -ENOENT) {
1876                         CDEBUG(D_INODE,
1877                                "%s: destroying non-existent object "DFID"\n",
1878                                ofd_name(ofd), PFID(fid));
1879                         /* rewrite rc with -ENOENT only if it is 0 */
1880                         if (rc == 0)
1881                                 rc = lrc;
1882                 } else if (lrc != 0) {
1883                         CERROR("%s: error destroying object "DFID": %d\n",
1884                                ofd_name(ofd), PFID(fid), lrc);
1885                         rc = lrc;
1886                 }
1887
1888                 count--;
1889                 oid++;
1890                 lrc = fid_set_id(fid, oid);
1891                 if (unlikely(lrc != 0 && count > 0))
1892                         GOTO(out, rc = lrc);
1893         }
1894
1895         ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_DESTROY,
1896                          tsi->tsi_jobid, 1);
1897
1898         GOTO(out, rc);
1899
1900 out:
1901         fid_to_ostid(fid, &repbody->oa.o_oi);
1902         return rc;
1903 }
1904
1905 /**
1906  * OFD request handler for OST_STATFS RPC.
1907  *
1908  * This function gets statfs data from storage as part of request
1909  * processing.
1910  *
1911  * \param[in] tsi       target session environment for this request
1912  *
1913  * \retval              0 if successful
1914  * \retval              negative value on error
1915  */
1916 static int ofd_statfs_hdl(struct tgt_session_info *tsi)
1917 {
1918         struct obd_statfs       *osfs;
1919         int                      rc;
1920
1921         ENTRY;
1922
1923         osfs = req_capsule_server_get(tsi->tsi_pill, &RMF_OBD_STATFS);
1924
1925         rc = ofd_statfs(tsi->tsi_env, tsi->tsi_exp, osfs,
1926                         cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS), 0);
1927         if (rc != 0)
1928                 CERROR("%s: statfs failed: rc = %d\n",
1929                        tgt_name(tsi->tsi_tgt), rc);
1930
1931         if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_EINPROGRESS))
1932                 rc = -EINPROGRESS;
1933
1934         ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_STATFS,
1935                          tsi->tsi_jobid, 1);
1936
1937         RETURN(rc);
1938 }
1939
1940 /**
1941  * OFD request handler for OST_SYNC RPC.
1942  *
1943  * Sync object data or all filesystem data to the disk and pack the
1944  * result in reply.
1945  *
1946  * \param[in] tsi       target session environment for this request
1947  *
1948  * \retval              0 if successful
1949  * \retval              negative value on error
1950  */
1951 static int ofd_sync_hdl(struct tgt_session_info *tsi)
1952 {
1953         struct ost_body         *body = tsi->tsi_ost_body;
1954         struct ost_body         *repbody;
1955         struct ofd_thread_info  *fti = tsi2ofd_info(tsi);
1956         struct ofd_device       *ofd = ofd_exp(tsi->tsi_exp);
1957         struct ofd_object       *fo = NULL;
1958         int                      rc = 0;
1959
1960         ENTRY;
1961
1962         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
1963
1964         /* if no objid is specified, it means "sync whole filesystem" */
1965         if (!fid_is_zero(&tsi->tsi_fid)) {
1966                 fo = ofd_object_find_exists(tsi->tsi_env, ofd, &tsi->tsi_fid);
1967                 if (IS_ERR(fo))
1968                         RETURN(PTR_ERR(fo));
1969         }
1970
1971         rc = tgt_sync(tsi->tsi_env, tsi->tsi_tgt,
1972                       fo != NULL ? ofd_object_child(fo) : NULL,
1973                       repbody->oa.o_size, repbody->oa.o_blocks);
1974         if (rc)
1975                 GOTO(put, rc);
1976
1977         ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_SYNC,
1978                          tsi->tsi_jobid, 1);
1979         if (fo == NULL)
1980                 RETURN(0);
1981
1982         repbody->oa.o_oi = body->oa.o_oi;
1983         repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1984
1985         rc = ofd_attr_get(tsi->tsi_env, fo, &fti->fti_attr);
1986         if (rc == 0)
1987                 obdo_from_la(&repbody->oa, &fti->fti_attr,
1988                              OFD_VALID_FLAGS);
1989         else
1990                 /* don't return rc from getattr */
1991                 rc = 0;
1992         EXIT;
1993 put:
1994         if (fo != NULL)
1995                 ofd_object_put(tsi->tsi_env, fo);
1996         return rc;
1997 }
1998
1999 /**
2000  * OFD request handler for OST_PUNCH RPC.
2001  *
2002  * This is part of request processing. Validate request fields,
2003  * punch (truncate) the given OFD object and pack reply.
2004  *
2005  * \param[in] tsi       target session environment for this request
2006  *
2007  * \retval              0 if successful
2008  * \retval              negative value on error
2009  */
2010 static int ofd_punch_hdl(struct tgt_session_info *tsi)
2011 {
2012         const struct obdo       *oa = &tsi->tsi_ost_body->oa;
2013         struct ost_body         *repbody;
2014         struct ofd_thread_info  *info = tsi2ofd_info(tsi);
2015         struct ldlm_namespace   *ns = tsi->tsi_tgt->lut_obd->obd_namespace;
2016         struct ldlm_resource    *res;
2017         struct ofd_object       *fo;
2018         struct filter_fid       *ff = NULL;
2019         __u64                    flags = 0;
2020         struct lustre_handle     lh = { 0, };
2021         int                      rc;
2022         __u64                    start, end;
2023         bool                     srvlock;
2024
2025         ENTRY;
2026
2027         /* check that we do support OBD_CONNECT_TRUNCLOCK. */
2028         CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
2029
2030         if ((oa->o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
2031             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
2032                 RETURN(err_serious(-EPROTO));
2033
2034         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
2035         if (repbody == NULL)
2036                 RETURN(err_serious(-ENOMEM));
2037
2038         /* punch start,end are passed in o_size,o_blocks throught wire */
2039         start = oa->o_size;
2040         end = oa->o_blocks;
2041
2042         if (end != OBD_OBJECT_EOF) /* Only truncate is supported */
2043                 RETURN(-EPROTO);
2044
2045         /* standard truncate optimization: if file body is completely
2046          * destroyed, don't send data back to the server. */
2047         if (start == 0)
2048                 flags |= LDLM_FL_AST_DISCARD_DATA;
2049
2050         repbody->oa.o_oi = oa->o_oi;
2051         repbody->oa.o_valid = OBD_MD_FLID;
2052
2053         srvlock = oa->o_valid & OBD_MD_FLFLAGS &&
2054                   oa->o_flags & OBD_FL_SRVLOCK;
2055
2056         if (srvlock) {
2057                 rc = tgt_extent_lock(ns, &tsi->tsi_resid, start, end, &lh,
2058                                      LCK_PW, &flags);
2059                 if (rc != 0)
2060                         RETURN(rc);
2061         }
2062
2063         CDEBUG(D_INODE, "calling punch for object "DFID", valid = "LPX64
2064                ", start = "LPD64", end = "LPD64"\n", PFID(&tsi->tsi_fid),
2065                oa->o_valid, start, end);
2066
2067         fo = ofd_object_find_exists(tsi->tsi_env, ofd_exp(tsi->tsi_exp),
2068                                     &tsi->tsi_fid);
2069         if (IS_ERR(fo))
2070                 GOTO(out, rc = PTR_ERR(fo));
2071
2072         la_from_obdo(&info->fti_attr, oa,
2073                      OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME);
2074         info->fti_attr.la_size = start;
2075         info->fti_attr.la_valid |= LA_SIZE;
2076
2077         if (oa->o_valid & OBD_MD_FLFID) {
2078                 ff = &info->fti_mds_fid;
2079                 ofd_prepare_fidea(ff, oa);
2080         }
2081
2082         rc = ofd_object_punch(tsi->tsi_env, fo, start, end, &info->fti_attr,
2083                               ff, (struct obdo *)oa);
2084         if (rc)
2085                 GOTO(out_put, rc);
2086
2087         ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_PUNCH,
2088                          tsi->tsi_jobid, 1);
2089         EXIT;
2090 out_put:
2091         ofd_object_put(tsi->tsi_env, fo);
2092 out:
2093         if (srvlock)
2094                 tgt_extent_unlock(&lh, LCK_PW);
2095         if (rc == 0) {
2096                 /* we do not call this before to avoid lu_object_find() in
2097                  *  ->lvbo_update() holding another reference on the object.
2098                  * otherwise concurrent destroy can make the object unavailable
2099                  * for 2nd lu_object_find() waiting for the first reference
2100                  * to go... deadlock! */
2101                 res = ldlm_resource_get(ns, NULL, &tsi->tsi_resid,
2102                                         LDLM_EXTENT, 0);
2103                 if (!IS_ERR(res)) {
2104                         ldlm_res_lvbo_update(res, NULL, 0);
2105                         ldlm_resource_putref(res);
2106                 }
2107         }
2108         return rc;
2109 }
2110
2111 /**
2112  * OFD request handler for OST_QUOTACTL RPC.
2113  *
2114  * This is part of request processing to validate incoming request fields,
2115  * get the requested data from OSD and pack reply.
2116  *
2117  * \param[in] tsi       target session environment for this request
2118  *
2119  * \retval              0 if successful
2120  * \retval              negative value on error
2121  */
2122 static int ofd_quotactl(struct tgt_session_info *tsi)
2123 {
2124         struct obd_quotactl     *oqctl, *repoqc;
2125         struct lu_nodemap       *nodemap =
2126                 tsi->tsi_exp->exp_target_data.ted_nodemap;
2127         int                      id;
2128         int                      rc;
2129
2130         ENTRY;
2131
2132         oqctl = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_QUOTACTL);
2133         if (oqctl == NULL)
2134                 RETURN(err_serious(-EPROTO));
2135
2136         repoqc = req_capsule_server_get(tsi->tsi_pill, &RMF_OBD_QUOTACTL);
2137         if (repoqc == NULL)
2138                 RETURN(err_serious(-ENOMEM));
2139
2140         *repoqc = *oqctl;
2141
2142         id = repoqc->qc_id;
2143         if (oqctl->qc_type == USRQUOTA)
2144                 id = nodemap_map_id(nodemap, NODEMAP_UID,
2145                                     NODEMAP_CLIENT_TO_FS,
2146                                     repoqc->qc_id);
2147         else if (oqctl->qc_type == GRPQUOTA)
2148                 id = nodemap_map_id(nodemap, NODEMAP_GID,
2149                                     NODEMAP_CLIENT_TO_FS,
2150                                     repoqc->qc_id);
2151
2152         if (repoqc->qc_id != id)
2153                 swap(repoqc->qc_id, id);
2154
2155         rc = lquotactl_slv(tsi->tsi_env, tsi->tsi_tgt->lut_bottom, repoqc);
2156
2157         ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_QUOTACTL,
2158                          tsi->tsi_jobid, 1);
2159
2160         if (repoqc->qc_id != id)
2161                 swap(repoqc->qc_id, id);
2162
2163         RETURN(rc);
2164 }
2165
2166 /**
2167  * Calculate the amount of time for lock prolongation.
2168  *
2169  * This is helper for ofd_prolong_extent_locks() function to get
2170  * the timeout extra time.
2171  *
2172  * \param[in] req       current request
2173  *
2174  * \retval              amount of time to extend the timeout with
2175  */
2176 static inline int prolong_timeout(struct ptlrpc_request *req,
2177                                   struct ldlm_lock *lock)
2178 {
2179         struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
2180
2181         if (AT_OFF)
2182                 return obd_timeout / 2;
2183
2184         /* We are in the middle of the process - BL AST is sent, CANCEL
2185           is ahead. Take half of AT + IO process time. */
2186         return at_est2timeout(at_get(&svcpt->scp_at_estimate)) +
2187                 (ldlm_bl_timeout(lock) >> 1);
2188 }
2189
2190 /**
2191  * Prolong single lock timeout.
2192  *
2193  * This is supplemental function to the ofd_prolong_locks(). It prolongs
2194  * a single lock.
2195  *
2196  * \param[in] tsi       target session environment for this request
2197  * \param[in] lock      LDLM lock to prolong
2198  * \param[in] extent    related extent
2199  * \param[in] timeout   timeout value to add
2200  *
2201  * \retval              0 if lock is not suitable for prolongation
2202  * \retval              1 if lock was prolonged successfully
2203  */
2204 static int ofd_prolong_one_lock(struct tgt_session_info *tsi,
2205                                 struct ldlm_lock *lock,
2206                                 struct ldlm_extent *extent)
2207 {
2208         int timeout = prolong_timeout(tgt_ses_req(tsi), lock);
2209
2210         if (lock->l_flags & LDLM_FL_DESTROYED) /* lock already cancelled */
2211                 return 0;
2212
2213         /* XXX: never try to grab resource lock here because we're inside
2214          * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take
2215          * res lock and then exp_bl_list_lock. */
2216
2217         if (!(lock->l_flags & LDLM_FL_AST_SENT))
2218                 /* ignore locks not being cancelled */
2219                 return 0;
2220
2221         LDLM_DEBUG(lock, "refreshed for req x"LPU64" ext("LPU64"->"LPU64") "
2222                          "to %ds.\n", tgt_ses_req(tsi)->rq_xid, extent->start,
2223                          extent->end, timeout);
2224
2225         /* OK. this is a possible lock the user holds doing I/O
2226          * let's refresh eviction timer for it */
2227         ldlm_refresh_waiting_lock(lock, timeout);
2228         return 1;
2229 }
2230
2231 /**
2232  * Prolong lock timeout for the given extent.
2233  *
2234  * This function finds all locks related with incoming request and
2235  * prolongs their timeout.
2236  *
2237  * If a client is holding a lock for a long time while it sends
2238  * read or write RPCs to the OST for the object under this lock,
2239  * then we don't want the OST to evict the client. Otherwise,
2240  * if the network or disk is very busy then the client may not
2241  * be able to make any progress to clear out dirty pages under
2242  * the lock and the application will fail.
2243  *
2244  * Every time a Bulk Read/Write (BRW) request arrives for the object
2245  * covered by the lock, extend the timeout on that lock. The RPC should
2246  * contain a lock handle for the lock it is using, but this
2247  * isn't handled correctly by all client versions, and the
2248  * request may cover multiple locks.
2249  *
2250  * \param[in] tsi       target session environment for this request
2251  * \param[in] start     start of extent
2252  * \param[in] end       end of extent
2253  *
2254  * \retval              number of prolonged locks
2255  */
2256 static int ofd_prolong_extent_locks(struct tgt_session_info *tsi,
2257                                     __u64 start, __u64 end)
2258 {
2259         struct obd_export       *exp = tsi->tsi_exp;
2260         struct obdo             *oa  = &tsi->tsi_ost_body->oa;
2261         struct ldlm_extent       extent = {
2262                 .start = start,
2263                 .end = end
2264         };
2265         struct ldlm_lock        *lock;
2266         int                      lock_count = 0;
2267
2268         ENTRY;
2269
2270         if (oa->o_valid & OBD_MD_FLHANDLE) {
2271                 /* mostly a request should be covered by only one lock, try
2272                  * fast path. */
2273                 lock = ldlm_handle2lock(&oa->o_handle);
2274                 if (lock != NULL) {
2275                         /* Fast path to check if the lock covers the whole IO
2276                          * region exclusively. */
2277                         if (lock->l_granted_mode == LCK_PW &&
2278                             ldlm_extent_contain(&lock->l_policy_data.l_extent,
2279                                                 &extent)) {
2280                                 /* bingo */
2281                                 LASSERT(lock->l_export == exp);
2282                                 lock_count = ofd_prolong_one_lock(tsi, lock,
2283                                                                   &extent);
2284                                 LDLM_LOCK_PUT(lock);
2285                                 RETURN(lock_count);
2286                         }
2287                         lock->l_last_used = cfs_time_current();
2288                         LDLM_LOCK_PUT(lock);
2289                 }
2290         }
2291
2292         spin_lock_bh(&exp->exp_bl_list_lock);
2293         list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
2294                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
2295                 LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
2296
2297                 /* ignore waiting locks, no more granted locks in the list */
2298                 if (lock->l_granted_mode != lock->l_req_mode)
2299                         break;
2300
2301                 if (!ldlm_res_eq(&tsi->tsi_resid, &lock->l_resource->lr_name))
2302                         continue;
2303
2304                 if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent,
2305                                          &extent))
2306                         continue;
2307
2308                 lock_count += ofd_prolong_one_lock(tsi, lock, &extent);
2309         }
2310         spin_unlock_bh(&exp->exp_bl_list_lock);
2311
2312         RETURN(lock_count);
2313 }
2314
2315 /**
2316  * Implementation of ptlrpc_hpreq_ops::hpreq_lock_match for OFD RW requests.
2317  *
2318  * Determine if \a lock and the lock from request \a req are equivalent
2319  * by comparing their resource names, modes, and extents.
2320  *
2321  * It is used to give priority to read and write RPCs being done
2322  * under this lock so that the client can drop the contended
2323  * lock more quickly and let other clients use it. This improves
2324  * overall performance in the case where the first client gets a
2325  * very large lock extent that prevents other clients from
2326  * submitting their writes.
2327  *
2328  * \param[in] req       ptlrpc_request being processed
2329  * \param[in] lock      contended lock to match
2330  *
2331  * \retval              1 if lock is matched
2332  * \retval              0 otherwise
2333  */
2334 static int ofd_rw_hpreq_lock_match(struct ptlrpc_request *req,
2335                                    struct ldlm_lock *lock)
2336 {
2337         struct niobuf_remote *rnb;
2338         struct obd_ioobj *ioo;
2339         enum ldlm_mode  mode;
2340         struct ldlm_extent ext;
2341         __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
2342
2343         ENTRY;
2344
2345         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
2346         LASSERT(ioo != NULL);
2347
2348         rnb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
2349         LASSERT(rnb != NULL);
2350
2351         ext.start = rnb->rnb_offset;
2352         rnb += ioo->ioo_bufcnt - 1;
2353         ext.end = rnb->rnb_offset + rnb->rnb_len - 1;
2354
2355         LASSERT(lock->l_resource != NULL);
2356         if (!ostid_res_name_eq(&ioo->ioo_oid, &lock->l_resource->lr_name))
2357                 RETURN(0);
2358
2359         /* a bulk write can only hold a reference on a PW extent lock */
2360         mode = LCK_PW;
2361         if (opc == OST_READ)
2362                 /* whereas a bulk read can be protected by either a PR or PW
2363                  * extent lock */
2364                 mode |= LCK_PR;
2365
2366         if (!(lock->l_granted_mode & mode))
2367                 RETURN(0);
2368
2369         RETURN(ldlm_extent_overlap(&lock->l_policy_data.l_extent, &ext));
2370 }
2371
2372 /**
2373  * Implementation of ptlrpc_hpreq_ops::hpreq_lock_check for OFD RW requests.
2374  *
2375  * Check for whether the given PTLRPC request (\a req) is blocking
2376  * an LDLM lock cancel.
2377  *
2378  * \param[in] req       the incoming request
2379  *
2380  * \retval              1 if \a req is blocking an LDLM lock cancel
2381  * \retval              0 if it is not
2382  */
2383 static int ofd_rw_hpreq_check(struct ptlrpc_request *req)
2384 {
2385         struct tgt_session_info *tsi;
2386         struct obd_ioobj        *ioo;
2387         struct niobuf_remote    *rnb;
2388         __u64                    start, end;
2389         int                      lock_count;
2390
2391         ENTRY;
2392
2393         /* Don't use tgt_ses_info() to get session info, because lock_match()
2394          * can be called while request has no processing thread yet. */
2395         tsi = lu_context_key_get(&req->rq_session, &tgt_session_key);
2396
2397         /*
2398          * Use LASSERT below because malformed RPCs should have
2399          * been filtered out in tgt_hpreq_handler().
2400          */
2401         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
2402         LASSERT(ioo != NULL);
2403
2404         rnb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
2405         LASSERT(rnb != NULL);
2406         LASSERT(!(rnb->rnb_flags & OBD_BRW_SRVLOCK));
2407
2408         start = rnb->rnb_offset;
2409         rnb += ioo->ioo_bufcnt - 1;
2410         end = rnb->rnb_offset + rnb->rnb_len - 1;
2411
2412         DEBUG_REQ(D_RPCTRACE, req, "%s %s: refresh rw locks: "DFID
2413                                    " ("LPU64"->"LPU64")\n",
2414                   tgt_name(tsi->tsi_tgt), current->comm,
2415                   PFID(&tsi->tsi_fid), start, end);
2416
2417         lock_count = ofd_prolong_extent_locks(tsi, start, end);
2418
2419         CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
2420                tgt_name(tsi->tsi_tgt), lock_count, req);
2421
2422         RETURN(lock_count > 0);
2423 }
2424
2425 /**
2426  * Implementation of ptlrpc_hpreq_ops::hpreq_lock_fini for OFD RW requests.
2427  *
2428  * Called after the request has been handled. It refreshes lock timeout again
2429  * so that client has more time to send lock cancel RPC.
2430  *
2431  * \param[in] req       request which is being processed.
2432  */
2433 static void ofd_rw_hpreq_fini(struct ptlrpc_request *req)
2434 {
2435         ofd_rw_hpreq_check(req);
2436 }
2437
2438 /**
2439  * Implementation of ptlrpc_hpreq_ops::hpreq_lock_match for OST_PUNCH request.
2440  *
2441  * This function checks if the given lock is the same by its resname, mode
2442  * and extent as one taken from the request.
2443  * It is used to give priority to punch/truncate RPCs that might lead to
2444  * the fastest release of that lock when a lock is contended.
2445  *
2446  * \param[in] req       ptlrpc_request being processed
2447  * \param[in] lock      contended lock to match
2448  *
2449  * \retval              1 if lock is matched
2450  * \retval              0 otherwise
2451  */
2452 static int ofd_punch_hpreq_lock_match(struct ptlrpc_request *req,
2453                                       struct ldlm_lock *lock)
2454 {
2455         struct tgt_session_info *tsi;
2456
2457         /* Don't use tgt_ses_info() to get session info, because lock_match()
2458          * can be called while request has no processing thread yet. */
2459         tsi = lu_context_key_get(&req->rq_session, &tgt_session_key);
2460
2461         /*
2462          * Use LASSERT below because malformed RPCs should have
2463          * been filtered out in tgt_hpreq_handler().
2464          */
2465         LASSERT(tsi->tsi_ost_body != NULL);
2466         if (tsi->tsi_ost_body->oa.o_valid & OBD_MD_FLHANDLE &&
2467             tsi->tsi_ost_body->oa.o_handle.cookie == lock->l_handle.h_cookie)
2468                 return 1;
2469
2470         return 0;
2471 }
2472
2473 /**
2474  * Implementation of ptlrpc_hpreq_ops::hpreq_lock_check for OST_PUNCH request.
2475  *
2476  * High-priority queue request check for whether the given punch request
2477  * (\a req) is blocking an LDLM lock cancel.
2478  *
2479  * \param[in] req       the incoming request
2480  *
2481  * \retval              1 if \a req is blocking an LDLM lock cancel
2482  * \retval              0 if it is not
2483  */
2484 static int ofd_punch_hpreq_check(struct ptlrpc_request *req)
2485 {
2486         struct tgt_session_info *tsi;
2487         struct obdo             *oa;
2488         int                      lock_count;
2489
2490         ENTRY;
2491
2492         /* Don't use tgt_ses_info() to get session info, because lock_match()
2493          * can be called while request has no processing thread yet. */
2494         tsi = lu_context_key_get(&req->rq_session, &tgt_session_key);
2495         LASSERT(tsi != NULL);
2496         oa = &tsi->tsi_ost_body->oa;
2497
2498         LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS &&
2499                   oa->o_flags & OBD_FL_SRVLOCK));
2500
2501         CDEBUG(D_DLMTRACE,
2502                "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
2503                tgt_name(tsi->tsi_tgt), tsi->tsi_resid.name[0],
2504                tsi->tsi_resid.name[1], oa->o_size, oa->o_blocks);
2505
2506         lock_count = ofd_prolong_extent_locks(tsi, oa->o_size, oa->o_blocks);
2507
2508         CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
2509                tgt_name(tsi->tsi_tgt), lock_count, req);
2510
2511         RETURN(lock_count > 0);
2512 }
2513
2514 /**
2515  * Implementation of ptlrpc_hpreq_ops::hpreq_lock_fini for OST_PUNCH request.
2516  *
2517  * Called after the request has been handled. It refreshes lock timeout again
2518  * so that client has more time to send lock cancel RPC.
2519  *
2520  * \param[in] req       request which is being processed.
2521  */
2522 static void ofd_punch_hpreq_fini(struct ptlrpc_request *req)
2523 {
2524         ofd_punch_hpreq_check(req);
2525 }
2526
2527 static struct ptlrpc_hpreq_ops ofd_hpreq_rw = {
2528         .hpreq_lock_match       = ofd_rw_hpreq_lock_match,
2529         .hpreq_check            = ofd_rw_hpreq_check,
2530         .hpreq_fini             = ofd_rw_hpreq_fini
2531 };
2532
2533 static struct ptlrpc_hpreq_ops ofd_hpreq_punch = {
2534         .hpreq_lock_match       = ofd_punch_hpreq_lock_match,
2535         .hpreq_check            = ofd_punch_hpreq_check,
2536         .hpreq_fini             = ofd_punch_hpreq_fini
2537 };
2538
2539 /**
2540  * Assign high priority operations to an IO request.
2541  *
2542  * Check if the incoming request is a candidate for
2543  * high-priority processing. If it is, assign it a high
2544  * priority operations table.
2545  *
2546  * \param[in] tsi       target session environment for this request
2547  */
2548 static void ofd_hp_brw(struct tgt_session_info *tsi)
2549 {
2550         struct niobuf_remote    *rnb;
2551         struct obd_ioobj        *ioo;
2552
2553         ENTRY;
2554
2555         ioo = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_IOOBJ);
2556         LASSERT(ioo != NULL); /* must exist after request preprocessing */
2557         if (ioo->ioo_bufcnt > 0) {
2558                 rnb = req_capsule_client_get(tsi->tsi_pill, &RMF_NIOBUF_REMOTE);
2559                 LASSERT(rnb != NULL); /* must exist after request preprocessing */
2560
2561                 /* no high priority if server lock is needed */
2562                 if (rnb->rnb_flags & OBD_BRW_SRVLOCK)
2563                         return;
2564         }
2565         tgt_ses_req(tsi)->rq_ops = &ofd_hpreq_rw;
2566 }
2567
2568 /**
2569  * Assign high priority operations to an punch request.
2570  *
2571  * Check if the incoming request is a candidate for
2572  * high-priority processing. If it is, assign it a high
2573  * priority operations table.
2574  *
2575  * \param[in] tsi       target session environment for this request
2576  */
2577 static void ofd_hp_punch(struct tgt_session_info *tsi)
2578 {
2579         LASSERT(tsi->tsi_ost_body != NULL); /* must exists if we are here */
2580         /* no high-priority if server lock is needed */
2581         if (tsi->tsi_ost_body->oa.o_valid & OBD_MD_FLFLAGS &&
2582             tsi->tsi_ost_body->oa.o_flags & OBD_FL_SRVLOCK)
2583                 return;
2584         tgt_ses_req(tsi)->rq_ops = &ofd_hpreq_punch;
2585 }
2586
2587 #define OBD_FAIL_OST_READ_NET   OBD_FAIL_OST_BRW_NET
2588 #define OBD_FAIL_OST_WRITE_NET  OBD_FAIL_OST_BRW_NET
2589 #define OST_BRW_READ    OST_READ
2590 #define OST_BRW_WRITE   OST_WRITE
2591
2592 /**
2593  * Table of OFD-specific request handlers
2594  *
2595  * This table contains all opcodes accepted by OFD and
2596  * specifies handlers for them. The tgt_request_handler()
2597  * uses such table from each target to process incoming
2598  * requests.
2599  */
2600 static struct tgt_handler ofd_tgt_handlers[] = {
2601 TGT_RPC_HANDLER(OST_FIRST_OPC,
2602                 0,                      OST_CONNECT,    tgt_connect,
2603                 &RQF_CONNECT, LUSTRE_OBD_VERSION),
2604 TGT_RPC_HANDLER(OST_FIRST_OPC,
2605                 0,                      OST_DISCONNECT, tgt_disconnect,
2606                 &RQF_OST_DISCONNECT, LUSTRE_OBD_VERSION),
2607 TGT_RPC_HANDLER(OST_FIRST_OPC,
2608                 0,                      OST_SET_INFO,   ofd_set_info_hdl,
2609                 &RQF_OBD_SET_INFO, LUSTRE_OST_VERSION),
2610 TGT_OST_HDL(0,                          OST_GET_INFO,   ofd_get_info_hdl),
2611 TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO, OST_GETATTR,    ofd_getattr_hdl),
2612 TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR,
2613                                         OST_SETATTR,    ofd_setattr_hdl),
2614 TGT_OST_HDL(0           | HABEO_REFERO | MUTABOR,
2615                                         OST_CREATE,     ofd_create_hdl),
2616 TGT_OST_HDL(0           | HABEO_REFERO | MUTABOR,
2617                                         OST_DESTROY,    ofd_destroy_hdl),
2618 TGT_OST_HDL(0           | HABEO_REFERO, OST_STATFS,     ofd_statfs_hdl),
2619 TGT_OST_HDL_HP(HABEO_CORPUS| HABEO_REFERO,
2620                                         OST_BRW_READ,   tgt_brw_read,
2621                                                         ofd_hp_brw),
2622 /* don't set CORPUS flag for brw_write because -ENOENT may be valid case */
2623 TGT_OST_HDL_HP(HABEO_CORPUS| MUTABOR,   OST_BRW_WRITE,  tgt_brw_write,
2624                                                         ofd_hp_brw),
2625 TGT_OST_HDL_HP(HABEO_CORPUS| HABEO_REFERO | MUTABOR,
2626                                         OST_PUNCH,      ofd_punch_hdl,
2627                                                         ofd_hp_punch),
2628 TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO, OST_SYNC,       ofd_sync_hdl),
2629 TGT_OST_HDL(0           | HABEO_REFERO, OST_QUOTACTL,   ofd_quotactl),
2630 };
2631
2632 static struct tgt_opc_slice ofd_common_slice[] = {
2633         {
2634                 .tos_opc_start  = OST_FIRST_OPC,
2635                 .tos_opc_end    = OST_LAST_OPC,
2636                 .tos_hs         = ofd_tgt_handlers
2637         },
2638         {
2639                 .tos_opc_start  = OBD_FIRST_OPC,
2640                 .tos_opc_end    = OBD_LAST_OPC,
2641                 .tos_hs         = tgt_obd_handlers
2642         },
2643         {
2644                 .tos_opc_start  = LDLM_FIRST_OPC,
2645                 .tos_opc_end    = LDLM_LAST_OPC,
2646                 .tos_hs         = tgt_dlm_handlers
2647         },
2648         {
2649                 .tos_opc_start  = OUT_UPDATE_FIRST_OPC,
2650                 .tos_opc_end    = OUT_UPDATE_LAST_OPC,
2651                 .tos_hs         = tgt_out_handlers
2652         },
2653         {
2654                 .tos_opc_start  = SEQ_FIRST_OPC,
2655                 .tos_opc_end    = SEQ_LAST_OPC,
2656                 .tos_hs         = seq_handlers
2657         },
2658         {
2659                 .tos_opc_start  = LFSCK_FIRST_OPC,
2660                 .tos_opc_end    = LFSCK_LAST_OPC,
2661                 .tos_hs         = tgt_lfsck_handlers
2662         },
2663         {
2664                 .tos_opc_start  = SEC_FIRST_OPC,
2665                 .tos_opc_end    = SEC_LAST_OPC,
2666                 .tos_hs         = tgt_sec_ctx_handlers
2667         },
2668         {
2669                 .tos_hs         = NULL
2670         }
2671 };
2672
2673 /* context key constructor/destructor: ofd_key_init(), ofd_key_fini() */
2674 LU_KEY_INIT_FINI(ofd, struct ofd_thread_info);
2675
2676 /**
2677  * Implementation of lu_context_key::lct_key_exit.
2678  *
2679  * Optional method called on lu_context_exit() for all allocated
2680  * keys.
2681  * It is used in OFD to sanitize context values which may be re-used
2682  * during another request processing by the same thread.
2683  *
2684  * \param[in] ctx       execution context
2685  * \param[in] key       context key
2686  * \param[in] data      ofd_thread_info
2687  */
2688 static void ofd_key_exit(const struct lu_context *ctx,
2689                          struct lu_context_key *key, void *data)
2690 {
2691         struct ofd_thread_info *info = data;
2692
2693         info->fti_env = NULL;
2694         info->fti_exp = NULL;
2695
2696         info->fti_xid = 0;
2697         info->fti_pre_version = 0;
2698         info->fti_used = 0;
2699
2700         memset(&info->fti_attr, 0, sizeof info->fti_attr);
2701 }
2702
2703 struct lu_context_key ofd_thread_key = {
2704         .lct_tags = LCT_DT_THREAD,
2705         .lct_init = ofd_key_init,
2706         .lct_fini = ofd_key_fini,
2707         .lct_exit = ofd_key_exit
2708 };
2709
2710 /**
2711  * Initialize OFD device according to parameters in the config log \a cfg.
2712  *
2713  * This is the main starting point of OFD initialization. It fills all OFD
2714  * parameters with their initial values and calls other initializing functions
2715  * to set up all OFD subsystems.
2716  *
2717  * \param[in] env       execution environment
2718  * \param[in] m         OFD device
2719  * \param[in] ldt       LU device type of OFD
2720  * \param[in] cfg       configuration log
2721  *
2722  * \retval              0 if successful
2723  * \retval              negative value on error
2724  */
2725 static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
2726                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
2727 {
2728         const char              *dev = lustre_cfg_string(cfg, 0);
2729         struct ofd_thread_info  *info = NULL;
2730         struct obd_device       *obd;
2731         struct obd_statfs       *osfs;
2732         int                      rc;
2733
2734         ENTRY;
2735
2736         obd = class_name2obd(dev);
2737         if (obd == NULL) {
2738                 CERROR("Cannot find obd with name %s\n", dev);
2739                 RETURN(-ENODEV);
2740         }
2741
2742         rc = lu_env_refill((struct lu_env *)env);
2743         if (rc != 0)
2744                 RETURN(rc);
2745
2746         obd->u.obt.obt_magic = OBT_MAGIC;
2747
2748         m->ofd_fmd_max_num = OFD_FMD_MAX_NUM_DEFAULT;
2749         m->ofd_fmd_max_age = OFD_FMD_MAX_AGE_DEFAULT;
2750
2751         spin_lock_init(&m->ofd_flags_lock);
2752         m->ofd_raid_degraded = 0;
2753         m->ofd_syncjournal = 0;
2754         ofd_slc_set(m);
2755         m->ofd_grant_compat_disable = 0;
2756         m->ofd_soft_sync_limit = OFD_SOFT_SYNC_LIMIT_DEFAULT;
2757
2758         /* statfs data */
2759         spin_lock_init(&m->ofd_osfs_lock);
2760         m->ofd_osfs_age = cfs_time_shift_64(-1000);
2761         m->ofd_osfs_unstable = 0;
2762         m->ofd_statfs_inflight = 0;
2763         m->ofd_osfs_inflight = 0;
2764
2765         /* grant data */
2766         spin_lock_init(&m->ofd_grant_lock);
2767         m->ofd_tot_dirty = 0;
2768         m->ofd_tot_granted = 0;
2769         m->ofd_tot_pending = 0;
2770         m->ofd_seq_count = 0;
2771         init_waitqueue_head(&m->ofd_inconsistency_thread.t_ctl_waitq);
2772         INIT_LIST_HEAD(&m->ofd_inconsistency_list);
2773         spin_lock_init(&m->ofd_inconsistency_lock);
2774
2775         spin_lock_init(&m->ofd_batch_lock);
2776         init_rwsem(&m->ofd_lastid_rwsem);
2777
2778         m->ofd_dt_dev.dd_lu_dev.ld_ops = &ofd_lu_ops;
2779         m->ofd_dt_dev.dd_lu_dev.ld_obd = obd;
2780         /* set this lu_device to obd, because error handling need it */
2781         obd->obd_lu_dev = &m->ofd_dt_dev.dd_lu_dev;
2782
2783         rc = ofd_procfs_init(m);
2784         if (rc) {
2785                 CERROR("Can't init ofd lprocfs, rc %d\n", rc);
2786                 RETURN(rc);
2787         }
2788
2789         /* No connection accepted until configurations will finish */
2790         spin_lock(&obd->obd_dev_lock);
2791         obd->obd_no_conn = 1;
2792         spin_unlock(&obd->obd_dev_lock);
2793         obd->obd_replayable = 1;
2794         if (cfg->lcfg_bufcount > 4 && LUSTRE_CFG_BUFLEN(cfg, 4) > 0) {
2795                 char *str = lustre_cfg_string(cfg, 4);
2796
2797                 if (strchr(str, 'n')) {
2798                         CWARN("%s: recovery disabled\n", obd->obd_name);
2799                         obd->obd_replayable = 0;
2800                 }
2801         }
2802
2803         info = ofd_info_init(env, NULL);
2804         if (info == NULL)
2805                 GOTO(err_fini_proc, rc = -EFAULT);
2806
2807         rc = ofd_stack_init(env, m, cfg);
2808         if (rc) {
2809                 CERROR("Can't init device stack, rc %d\n", rc);
2810                 GOTO(err_fini_proc, rc);
2811         }
2812
2813         ofd_procfs_add_brw_stats_symlink(m);
2814
2815         /* populate cached statfs data */
2816         osfs = &ofd_info(env)->fti_u.osfs;
2817         rc = ofd_statfs_internal(env, m, osfs, 0, NULL);
2818         if (rc != 0) {
2819                 CERROR("%s: can't get statfs data, rc %d\n", obd->obd_name, rc);
2820                 GOTO(err_fini_stack, rc);
2821         }
2822         if (!IS_PO2(osfs->os_bsize)) {
2823                 CERROR("%s: blocksize (%d) is not a power of 2\n",
2824                                 obd->obd_name, osfs->os_bsize);
2825                 GOTO(err_fini_stack, rc = -EPROTO);
2826         }
2827         m->ofd_blockbits = fls(osfs->os_bsize) - 1;
2828
2829         m->ofd_precreate_batch = OFD_PRECREATE_BATCH_DEFAULT;
2830         if (osfs->os_bsize * osfs->os_blocks < OFD_PRECREATE_SMALL_FS)
2831                 m->ofd_precreate_batch = OFD_PRECREATE_BATCH_SMALL;
2832
2833         snprintf(info->fti_u.name, sizeof(info->fti_u.name), "%s-%s",
2834                  "filter"/*LUSTRE_OST_NAME*/, obd->obd_uuid.uuid);
2835         m->ofd_namespace = ldlm_namespace_new(obd, info->fti_u.name,
2836                                               LDLM_NAMESPACE_SERVER,
2837                                               LDLM_NAMESPACE_GREEDY,
2838                                               LDLM_NS_TYPE_OST);
2839         if (m->ofd_namespace == NULL)
2840                 GOTO(err_fini_stack, rc = -ENOMEM);
2841         /* set obd_namespace for compatibility with old code */
2842         obd->obd_namespace = m->ofd_namespace;
2843         ldlm_register_intent(m->ofd_namespace, ofd_intent_policy);
2844         m->ofd_namespace->ns_lvbo = &ofd_lvbo;
2845         m->ofd_namespace->ns_lvbp = m;
2846
2847         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
2848                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
2849
2850         dt_conf_get(env, m->ofd_osd, &m->ofd_dt_conf);
2851
2852         /* Allow at most ddp_grant_reserved% of the available filesystem space
2853          * to be granted to clients, so that any errors in the grant overhead
2854          * calculations do not allow granting more space to clients than can be
2855          * written. Assumes that in aggregate the grant overhead calculations do
2856          * not have more than ddp_grant_reserved% estimation error in them. */
2857         m->ofd_grant_ratio =
2858                 ofd_grant_ratio_conv(m->ofd_dt_conf.ddp_grant_reserved);
2859
2860         rc = tgt_init(env, &m->ofd_lut, obd, m->ofd_osd, ofd_common_slice,
2861                       OBD_FAIL_OST_ALL_REQUEST_NET,
2862                       OBD_FAIL_OST_ALL_REPLY_NET);
2863         if (rc)
2864                 GOTO(err_free_ns, rc);
2865
2866         rc = ofd_fs_setup(env, m, obd);
2867         if (rc)
2868                 GOTO(err_fini_lut, rc);
2869
2870         rc = ofd_start_inconsistency_verification_thread(m);
2871         if (rc != 0)
2872                 GOTO(err_fini_fs, rc);
2873
2874         tgt_adapt_sptlrpc_conf(&m->ofd_lut, 1);
2875
2876         RETURN(0);
2877
2878 err_fini_fs:
2879         ofd_fs_cleanup(env, m);
2880 err_fini_lut:
2881         tgt_fini(env, &m->ofd_lut);
2882 err_free_ns:
2883         ldlm_namespace_free(m->ofd_namespace, NULL, obd->obd_force);
2884         obd->obd_namespace = m->ofd_namespace = NULL;
2885 err_fini_stack:
2886         ofd_stack_fini(env, m, &m->ofd_osd->dd_lu_dev);
2887 err_fini_proc:
2888         ofd_procfs_fini(m);
2889         return rc;
2890 }
2891
2892 /**
2893  * Stop the OFD device
2894  *
2895  * This function stops the OFD device and all its subsystems.
2896  * This is the end of OFD lifecycle.
2897  *
2898  * \param[in] env       execution environment
2899  * \param[in] m         OFD device
2900  */
2901 static void ofd_fini(const struct lu_env *env, struct ofd_device *m)
2902 {
2903         struct obd_device       *obd = ofd_obd(m);
2904         struct lu_device        *d   = &m->ofd_dt_dev.dd_lu_dev;
2905         struct lfsck_stop        stop;
2906
2907         stop.ls_status = LS_PAUSED;
2908         stop.ls_flags = 0;
2909         lfsck_stop(env, m->ofd_osd, &stop);
2910         target_recovery_fini(obd);
2911         if (m->ofd_namespace != NULL)
2912                 ldlm_namespace_free_prior(m->ofd_namespace, NULL,
2913                                           d->ld_obd->obd_force);
2914
2915         obd_exports_barrier(obd);
2916         obd_zombie_barrier();
2917
2918         tgt_fini(env, &m->ofd_lut);
2919         ofd_stop_inconsistency_verification_thread(m);
2920         lfsck_degister(env, m->ofd_osd);
2921         ofd_fs_cleanup(env, m);
2922
2923         if (m->ofd_namespace != NULL) {
2924                 ldlm_namespace_free_post(m->ofd_namespace);
2925                 d->ld_obd->obd_namespace = m->ofd_namespace = NULL;
2926         }
2927
2928         ofd_stack_fini(env, m, &m->ofd_dt_dev.dd_lu_dev);
2929         ofd_procfs_fini(m);
2930         LASSERT(atomic_read(&d->ld_ref) == 0);
2931         server_put_mount(obd->obd_name, true);
2932         EXIT;
2933 }
2934
2935 /**
2936  * Implementation of lu_device_type_operations::ldto_device_fini.
2937  *
2938  * Finalize device. Dual to ofd_device_init(). It is called from
2939  * obd_precleanup() and stops the current device.
2940  *
2941  * \param[in] env       execution environment
2942  * \param[in] d         LU device of OFD
2943  *
2944  * \retval              NULL
2945  */
2946 static struct lu_device *ofd_device_fini(const struct lu_env *env,
2947                                          struct lu_device *d)
2948 {
2949         ENTRY;
2950         ofd_fini(env, ofd_dev(d));
2951         RETURN(NULL);
2952 }
2953
2954 /**
2955  * Implementation of lu_device_type_operations::ldto_device_free.
2956  *
2957  * Free OFD device. Dual to ofd_device_alloc().
2958  *
2959  * \param[in] env       execution environment
2960  * \param[in] d         LU device of OFD
2961  *
2962  * \retval              NULL
2963  */
2964 static struct lu_device *ofd_device_free(const struct lu_env *env,
2965                                          struct lu_device *d)
2966 {
2967         struct ofd_device *m = ofd_dev(d);
2968
2969         dt_device_fini(&m->ofd_dt_dev);
2970         OBD_FREE_PTR(m);
2971         RETURN(NULL);
2972 }
2973
2974 /**
2975  * Implementation of lu_device_type_operations::ldto_device_alloc.
2976  *
2977  * This function allocates the new OFD device. It is called from
2978  * obd_setup() if OBD device had lu_device_type defined.
2979  *
2980  * \param[in] env       execution environment
2981  * \param[in] t         lu_device_type of OFD device
2982  * \param[in] cfg       configuration log
2983  *
2984  * \retval              pointer to the lu_device of just allocated OFD
2985  * \retval              ERR_PTR of return value on error
2986  */
2987 static struct lu_device *ofd_device_alloc(const struct lu_env *env,
2988                                           struct lu_device_type *t,
2989                                           struct lustre_cfg *cfg)
2990 {
2991         struct ofd_device *m;
2992         struct lu_device  *l;
2993         int                rc;
2994
2995         OBD_ALLOC_PTR(m);
2996         if (m == NULL)
2997                 return ERR_PTR(-ENOMEM);
2998
2999         l = &m->ofd_dt_dev.dd_lu_dev;
3000         dt_device_init(&m->ofd_dt_dev, t);
3001         rc = ofd_init0(env, m, t, cfg);
3002         if (rc != 0) {
3003                 ofd_device_free(env, l);
3004                 l = ERR_PTR(rc);
3005         }
3006
3007         return l;
3008 }
3009
3010 /* type constructor/destructor: ofd_type_init(), ofd_type_fini() */
3011 LU_TYPE_INIT_FINI(ofd, &ofd_thread_key);
3012
3013 static struct lu_device_type_operations ofd_device_type_ops = {
3014         .ldto_init              = ofd_type_init,
3015         .ldto_fini              = ofd_type_fini,
3016
3017         .ldto_start             = ofd_type_start,
3018         .ldto_stop              = ofd_type_stop,
3019
3020         .ldto_device_alloc      = ofd_device_alloc,
3021         .ldto_device_free       = ofd_device_free,
3022         .ldto_device_fini       = ofd_device_fini
3023 };
3024
3025 static struct lu_device_type ofd_device_type = {
3026         .ldt_tags       = LU_DEVICE_DT,
3027         .ldt_name       = LUSTRE_OST_NAME,
3028         .ldt_ops        = &ofd_device_type_ops,
3029         .ldt_ctx_tags   = LCT_DT_THREAD
3030 };
3031
3032 /**
3033  * Initialize OFD module.
3034  *
3035  * This function is called upon module loading. It registers OFD device type
3036  * and prepares all in-memory structures used by all OFD devices.
3037  *
3038  * \retval              0 if successful
3039  * \retval              negative value on error
3040  */
3041 static int __init ofd_init(void)
3042 {
3043         int                             rc;
3044
3045         rc = lu_kmem_init(ofd_caches);
3046         if (rc)
3047                 return rc;
3048
3049         rc = ofd_fmd_init();
3050         if (rc) {
3051                 lu_kmem_fini(ofd_caches);
3052                 return(rc);
3053         }
3054
3055         rc = class_register_type(&ofd_obd_ops, NULL, true, NULL,
3056                                  LUSTRE_OST_NAME, &ofd_device_type);
3057         return rc;
3058 }
3059
3060 /**
3061  * Stop OFD module.
3062  *
3063  * This function is called upon OFD module unloading.
3064  * It frees all related structures and unregisters OFD device type.
3065  */
3066 static void __exit ofd_exit(void)
3067 {
3068         ofd_fmd_exit();
3069         lu_kmem_fini(ofd_caches);
3070         class_unregister_type(LUSTRE_OST_NAME);
3071 }
3072
3073 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3074 MODULE_DESCRIPTION("Lustre Object Filtering Device");
3075 MODULE_VERSION(LUSTRE_VERSION_STRING);
3076 MODULE_LICENSE("GPL");
3077
3078 module_init(ofd_init);
3079 module_exit(ofd_exit);