Whamcloud - gitweb
LU-2446 build: Update Whamcloud copyright messages for Intel
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65 static int brw_interpret(const struct lu_env *env,
66                          struct ptlrpc_request *req, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
95                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
97         }
98
99         RETURN(lmm_size);
100 }
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         struct obd_import *imp = class_exp2cliimp(exp);
108         ENTRY;
109
110         if (lmm != NULL) {
111                 if (lmm_bytes < sizeof (*lmm)) {
112                         CERROR("lov_mds_md too small: %d, need %d\n",
113                                lmm_bytes, (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (lmm->lmm_object_id == 0) {
119                         CERROR("lov_mds_md: zero lmm_object_id\n");
120                         RETURN(-EINVAL);
121                 }
122         }
123
124         lsm_size = lov_stripe_md_size(1);
125         if (lsmp == NULL)
126                 RETURN(lsm_size);
127
128         if (*lsmp != NULL && lmm == NULL) {
129                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130                 OBD_FREE(*lsmp, lsm_size);
131                 *lsmp = NULL;
132                 RETURN(0);
133         }
134
135         if (*lsmp == NULL) {
136                 OBD_ALLOC(*lsmp, lsm_size);
137                 if (*lsmp == NULL)
138                         RETURN(-ENOMEM);
139                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141                         OBD_FREE(*lsmp, lsm_size);
142                         RETURN(-ENOMEM);
143                 }
144                 loi_init((*lsmp)->lsm_oinfo[0]);
145         }
146
147         if (lmm != NULL) {
148                 /* XXX zero *lsmp? */
149                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
151                 LASSERT((*lsmp)->lsm_object_id);
152                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
153         }
154
155         if (imp != NULL &&
156             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
157                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
158         else
159                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
264                        struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
308                        struct obd_info *oinfo, struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
318         if (req == NULL)
319                 RETURN(-ENOMEM);
320
321         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323         if (rc) {
324                 ptlrpc_request_free(req);
325                 RETURN(rc);
326         }
327
328         osc_pack_req_body(req, oinfo);
329
330         ptlrpc_request_set_replen(req);
331
332         rc = ptlrpc_queue_wait(req);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337         if (body == NULL)
338                 GOTO(out, rc = -EPROTO);
339
340         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
341
342         EXIT;
343 out:
344         ptlrpc_req_finished(req);
345         RETURN(rc);
346 }
347
348 static int osc_setattr_interpret(const struct lu_env *env,
349                                  struct ptlrpc_request *req,
350                                  struct osc_setattr_args *sa, int rc)
351 {
352         struct ost_body *body;
353         ENTRY;
354
355         if (rc != 0)
356                 GOTO(out, rc);
357
358         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359         if (body == NULL)
360                 GOTO(out, rc = -EPROTO);
361
362         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
363 out:
364         rc = sa->sa_upcall(sa->sa_cookie, rc);
365         RETURN(rc);
366 }
367
368 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369                            struct obd_trans_info *oti,
370                            obd_enqueue_update_f upcall, void *cookie,
371                            struct ptlrpc_request_set *rqset)
372 {
373         struct ptlrpc_request   *req;
374         struct osc_setattr_args *sa;
375         int                      rc;
376         ENTRY;
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
379         if (req == NULL)
380                 RETURN(-ENOMEM);
381
382         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384         if (rc) {
385                 ptlrpc_request_free(req);
386                 RETURN(rc);
387         }
388
389         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391
392         osc_pack_req_body(req, oinfo);
393
394         ptlrpc_request_set_replen(req);
395
396         /* do mds to ost setattr asynchronously */
397         if (!rqset) {
398                 /* Do not wait for response. */
399                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
400         } else {
401                 req->rq_interpret_reply =
402                         (ptlrpc_interpterer_t)osc_setattr_interpret;
403
404                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405                 sa = ptlrpc_req_async_args(req);
406                 sa->sa_oa = oinfo->oi_oa;
407                 sa->sa_upcall = upcall;
408                 sa->sa_cookie = cookie;
409
410                 if (rqset == PTLRPCD_SET)
411                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
412                 else
413                         ptlrpc_set_add_req(rqset, req);
414         }
415
416         RETURN(0);
417 }
418
419 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420                              struct obd_trans_info *oti,
421                              struct ptlrpc_request_set *rqset)
422 {
423         return osc_setattr_async_base(exp, oinfo, oti,
424                                       oinfo->oi_cb_up, oinfo, rqset);
425 }
426
427 int osc_real_create(struct obd_export *exp, struct obdo *oa,
428                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
429 {
430         struct ptlrpc_request *req;
431         struct ost_body       *body;
432         struct lov_stripe_md  *lsm;
433         int                    rc;
434         ENTRY;
435
436         LASSERT(oa);
437         LASSERT(ea);
438
439         lsm = *ea;
440         if (!lsm) {
441                 rc = obd_alloc_memmd(exp, &lsm);
442                 if (rc < 0)
443                         RETURN(rc);
444         }
445
446         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447         if (req == NULL)
448                 GOTO(out, rc = -ENOMEM);
449
450         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451         if (rc) {
452                 ptlrpc_request_free(req);
453                 GOTO(out, rc);
454         }
455
456         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457         LASSERT(body);
458         lustre_set_wire_obdo(&body->oa, oa);
459
460         ptlrpc_request_set_replen(req);
461
462         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
463             oa->o_flags == OBD_FL_DELORPHAN) {
464                 DEBUG_REQ(D_HA, req,
465                           "delorphan from OST integration");
466                 /* Don't resend the delorphan req */
467                 req->rq_no_resend = req->rq_no_delay = 1;
468         }
469
470         rc = ptlrpc_queue_wait(req);
471         if (rc)
472                 GOTO(out_req, rc);
473
474         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475         if (body == NULL)
476                 GOTO(out_req, rc = -EPROTO);
477
478         lustre_get_wire_obdo(oa, &body->oa);
479
480         /* This should really be sent by the OST */
481         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
482         oa->o_valid |= OBD_MD_FLBLKSZ;
483
484         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
485          * have valid lsm_oinfo data structs, so don't go touching that.
486          * This needs to be fixed in a big way.
487          */
488         lsm->lsm_object_id = oa->o_id;
489         lsm->lsm_object_seq = oa->o_seq;
490         *ea = lsm;
491
492         if (oti != NULL) {
493                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494
495                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
496                         if (!oti->oti_logcookies)
497                                 oti_alloc_cookies(oti, 1);
498                         *oti->oti_logcookies = oa->o_lcookie;
499                 }
500         }
501
502         CDEBUG(D_HA, "transno: "LPD64"\n",
503                lustre_msg_get_transno(req->rq_repmsg));
504 out_req:
505         ptlrpc_req_finished(req);
506 out:
507         if (rc && !*ea)
508                 obd_free_memmd(exp, &lsm);
509         RETURN(rc);
510 }
511
512 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513                    obd_enqueue_update_f upcall, void *cookie,
514                    struct ptlrpc_request_set *rqset)
515 {
516         struct ptlrpc_request   *req;
517         struct osc_setattr_args *sa;
518         struct ost_body         *body;
519         int                      rc;
520         ENTRY;
521
522         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
523         if (req == NULL)
524                 RETURN(-ENOMEM);
525
526         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528         if (rc) {
529                 ptlrpc_request_free(req);
530                 RETURN(rc);
531         }
532         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533         ptlrpc_at_set_req_timeout(req);
534
535         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536         LASSERT(body);
537         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
538         osc_pack_capa(req, body, oinfo->oi_capa);
539
540         ptlrpc_request_set_replen(req);
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
557                      struct obd_info *oinfo, struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync_interpret(const struct lu_env *env,
568                               struct ptlrpc_request *req,
569                               void *arg, int rc)
570 {
571         struct osc_fsync_args *fa = arg;
572         struct ost_body *body;
573         ENTRY;
574
575         if (rc)
576                 GOTO(out, rc);
577
578         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
579         if (body == NULL) {
580                 CERROR ("can't unpack ost_body\n");
581                 GOTO(out, rc = -EPROTO);
582         }
583
584         *fa->fa_oi->oi_oa = body->oa;
585 out:
586         rc = fa->fa_upcall(fa->fa_cookie, rc);
587         RETURN(rc);
588 }
589
590 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
591                   obd_enqueue_update_f upcall, void *cookie,
592                   struct ptlrpc_request_set *rqset)
593 {
594         struct ptlrpc_request *req;
595         struct ost_body       *body;
596         struct osc_fsync_args *fa;
597         int                    rc;
598         ENTRY;
599
600         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
601         if (req == NULL)
602                 RETURN(-ENOMEM);
603
604         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
605         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
606         if (rc) {
607                 ptlrpc_request_free(req);
608                 RETURN(rc);
609         }
610
611         /* overload the size and blocks fields in the oa with start/end */
612         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
613         LASSERT(body);
614         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
615         osc_pack_capa(req, body, oinfo->oi_capa);
616
617         ptlrpc_request_set_replen(req);
618         req->rq_interpret_reply = osc_sync_interpret;
619
620         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
621         fa = ptlrpc_req_async_args(req);
622         fa->fa_oi = oinfo;
623         fa->fa_upcall = upcall;
624         fa->fa_cookie = cookie;
625
626         if (rqset == PTLRPCD_SET)
627                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
628         else
629                 ptlrpc_set_add_req(rqset, req);
630
631         RETURN (0);
632 }
633
634 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
635                     struct obd_info *oinfo, obd_size start, obd_size end,
636                     struct ptlrpc_request_set *set)
637 {
638         ENTRY;
639
640         if (!oinfo->oi_oa) {
641                 CDEBUG(D_INFO, "oa NULL\n");
642                 RETURN(-EINVAL);
643         }
644
645         oinfo->oi_oa->o_size = start;
646         oinfo->oi_oa->o_blocks = end;
647         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
648
649         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
650 }
651
652 /* Find and cancel locally locks matched by @mode in the resource found by
653  * @objid. Found locks are added into @cancel list. Returns the amount of
654  * locks added to @cancels list. */
655 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
656                                    cfs_list_t *cancels,
657                                    ldlm_mode_t mode, int lock_flags)
658 {
659         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
660         struct ldlm_res_id res_id;
661         struct ldlm_resource *res;
662         int count;
663         ENTRY;
664
665         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
666          * export) but disabled through procfs (flag in NS).
667          *
668          * This distinguishes from a case when ELC is not supported originally,
669          * when we still want to cancel locks in advance and just cancel them
670          * locally, without sending any RPC. */
671         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
672                 RETURN(0);
673
674         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
675         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
676         if (res == NULL)
677                 RETURN(0);
678
679         LDLM_RESOURCE_ADDREF(res);
680         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
681                                            lock_flags, 0, NULL);
682         LDLM_RESOURCE_DELREF(res);
683         ldlm_resource_putref(res);
684         RETURN(count);
685 }
686
687 static int osc_destroy_interpret(const struct lu_env *env,
688                                  struct ptlrpc_request *req, void *data,
689                                  int rc)
690 {
691         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
692
693         cfs_atomic_dec(&cli->cl_destroy_in_flight);
694         cfs_waitq_signal(&cli->cl_destroy_waitq);
695         return 0;
696 }
697
698 static int osc_can_send_destroy(struct client_obd *cli)
699 {
700         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
701             cli->cl_max_rpcs_in_flight) {
702                 /* The destroy request can be sent */
703                 return 1;
704         }
705         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
706             cli->cl_max_rpcs_in_flight) {
707                 /*
708                  * The counter has been modified between the two atomic
709                  * operations.
710                  */
711                 cfs_waitq_signal(&cli->cl_destroy_waitq);
712         }
713         return 0;
714 }
715
716 int osc_create(const struct lu_env *env, struct obd_export *exp,
717                struct obdo *oa, struct lov_stripe_md **ea,
718                struct obd_trans_info *oti)
719 {
720         int rc = 0;
721         ENTRY;
722
723         LASSERT(oa);
724         LASSERT(ea);
725         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
726
727         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
728             oa->o_flags == OBD_FL_RECREATE_OBJS) {
729                 RETURN(osc_real_create(exp, oa, ea, oti));
730         }
731
732         if (!fid_seq_is_mdt(oa->o_seq))
733                 RETURN(osc_real_create(exp, oa, ea, oti));
734
735         /* we should not get here anymore */
736         LBUG();
737
738         RETURN(rc);
739 }
740
741 /* Destroy requests can be async always on the client, and we don't even really
742  * care about the return code since the client cannot do anything at all about
743  * a destroy failure.
744  * When the MDS is unlinking a filename, it saves the file objects into a
745  * recovery llog, and these object records are cancelled when the OST reports
746  * they were destroyed and sync'd to disk (i.e. transaction committed).
747  * If the client dies, or the OST is down when the object should be destroyed,
748  * the records are not cancelled, and when the OST reconnects to the MDS next,
749  * it will retrieve the llog unlink logs and then sends the log cancellation
750  * cookies to the MDS after committing destroy transactions. */
751 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
752                        struct obdo *oa, struct lov_stripe_md *ea,
753                        struct obd_trans_info *oti, struct obd_export *md_export,
754                        void *capa)
755 {
756         struct client_obd     *cli = &exp->exp_obd->u.cli;
757         struct ptlrpc_request *req;
758         struct ost_body       *body;
759         CFS_LIST_HEAD(cancels);
760         int rc, count;
761         ENTRY;
762
763         if (!oa) {
764                 CDEBUG(D_INFO, "oa NULL\n");
765                 RETURN(-EINVAL);
766         }
767
768         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
769                                         LDLM_FL_DISCARD_DATA);
770
771         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
772         if (req == NULL) {
773                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
774                 RETURN(-ENOMEM);
775         }
776
777         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
778         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
779                                0, &cancels, count);
780         if (rc) {
781                 ptlrpc_request_free(req);
782                 RETURN(rc);
783         }
784
785         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
786         ptlrpc_at_set_req_timeout(req);
787
788         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
789                 oa->o_lcookie = *oti->oti_logcookies;
790         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
791         LASSERT(body);
792         lustre_set_wire_obdo(&body->oa, oa);
793
794         osc_pack_capa(req, body, (struct obd_capa *)capa);
795         ptlrpc_request_set_replen(req);
796
797         /* If osc_destory is for destroying the unlink orphan,
798          * sent from MDT to OST, which should not be blocked here,
799          * because the process might be triggered by ptlrpcd, and
800          * it is not good to block ptlrpcd thread (b=16006)*/
801         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
802                 req->rq_interpret_reply = osc_destroy_interpret;
803                 if (!osc_can_send_destroy(cli)) {
804                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
805                                                           NULL);
806
807                         /*
808                          * Wait until the number of on-going destroy RPCs drops
809                          * under max_rpc_in_flight
810                          */
811                         l_wait_event_exclusive(cli->cl_destroy_waitq,
812                                                osc_can_send_destroy(cli), &lwi);
813                 }
814         }
815
816         /* Do not wait for response */
817         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
818         RETURN(0);
819 }
820
821 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
822                                 long writing_bytes)
823 {
824         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
825
826         LASSERT(!(oa->o_valid & bits));
827
828         oa->o_valid |= bits;
829         client_obd_list_lock(&cli->cl_loi_list_lock);
830         oa->o_dirty = cli->cl_dirty;
831         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
832                      cli->cl_dirty_max)) {
833                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
834                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
835                 oa->o_undirty = 0;
836         } else if (unlikely(cfs_atomic_read(&obd_dirty_pages) -
837                             cfs_atomic_read(&obd_dirty_transit_pages) >
838                             (long)(obd_max_dirty_pages + 1))) {
839                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
840                  * not covered by a lock thus they may safely race and trip
841                  * this CERROR() unless we add in a small fudge factor (+1). */
842                 CERROR("dirty %d - %d > system dirty_max %d\n",
843                        cfs_atomic_read(&obd_dirty_pages),
844                        cfs_atomic_read(&obd_dirty_transit_pages),
845                        obd_max_dirty_pages);
846                 oa->o_undirty = 0;
847         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
848                 CERROR("dirty %lu - dirty_max %lu too big???\n",
849                        cli->cl_dirty, cli->cl_dirty_max);
850                 oa->o_undirty = 0;
851         } else {
852                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
853                                       CFS_PAGE_SHIFT)*
854                                      (cli->cl_max_rpcs_in_flight + 1);
855                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
856         }
857         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
858         oa->o_dropped = cli->cl_lost_grant;
859         cli->cl_lost_grant = 0;
860         client_obd_list_unlock(&cli->cl_loi_list_lock);
861         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
862                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
863
864 }
865
866 void osc_update_next_shrink(struct client_obd *cli)
867 {
868         cli->cl_next_shrink_grant =
869                 cfs_time_shift(cli->cl_grant_shrink_interval);
870         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
871                cli->cl_next_shrink_grant);
872 }
873
874 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
875 {
876         client_obd_list_lock(&cli->cl_loi_list_lock);
877         cli->cl_avail_grant += grant;
878         client_obd_list_unlock(&cli->cl_loi_list_lock);
879 }
880
881 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
882 {
883         if (body->oa.o_valid & OBD_MD_FLGRANT) {
884                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
885                 __osc_update_grant(cli, body->oa.o_grant);
886         }
887 }
888
889 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
890                               obd_count keylen, void *key, obd_count vallen,
891                               void *val, struct ptlrpc_request_set *set);
892
893 static int osc_shrink_grant_interpret(const struct lu_env *env,
894                                       struct ptlrpc_request *req,
895                                       void *aa, int rc)
896 {
897         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
898         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
899         struct ost_body *body;
900
901         if (rc != 0) {
902                 __osc_update_grant(cli, oa->o_grant);
903                 GOTO(out, rc);
904         }
905
906         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
907         LASSERT(body);
908         osc_update_grant(cli, body);
909 out:
910         OBDO_FREE(oa);
911         return rc;
912 }
913
914 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
915 {
916         client_obd_list_lock(&cli->cl_loi_list_lock);
917         oa->o_grant = cli->cl_avail_grant / 4;
918         cli->cl_avail_grant -= oa->o_grant;
919         client_obd_list_unlock(&cli->cl_loi_list_lock);
920         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
921                 oa->o_valid |= OBD_MD_FLFLAGS;
922                 oa->o_flags = 0;
923         }
924         oa->o_flags |= OBD_FL_SHRINK_GRANT;
925         osc_update_next_shrink(cli);
926 }
927
928 /* Shrink the current grant, either from some large amount to enough for a
929  * full set of in-flight RPCs, or if we have already shrunk to that limit
930  * then to enough for a single RPC.  This avoids keeping more grant than
931  * needed, and avoids shrinking the grant piecemeal. */
932 static int osc_shrink_grant(struct client_obd *cli)
933 {
934         long target = (cli->cl_max_rpcs_in_flight + 1) *
935                       cli->cl_max_pages_per_rpc;
936
937         client_obd_list_lock(&cli->cl_loi_list_lock);
938         if (cli->cl_avail_grant <= target)
939                 target = cli->cl_max_pages_per_rpc;
940         client_obd_list_unlock(&cli->cl_loi_list_lock);
941
942         return osc_shrink_grant_to_target(cli, target);
943 }
944
945 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
946 {
947         int    rc = 0;
948         struct ost_body     *body;
949         ENTRY;
950
951         client_obd_list_lock(&cli->cl_loi_list_lock);
952         /* Don't shrink if we are already above or below the desired limit
953          * We don't want to shrink below a single RPC, as that will negatively
954          * impact block allocation and long-term performance. */
955         if (target < cli->cl_max_pages_per_rpc)
956                 target = cli->cl_max_pages_per_rpc;
957
958         if (target >= cli->cl_avail_grant) {
959                 client_obd_list_unlock(&cli->cl_loi_list_lock);
960                 RETURN(0);
961         }
962         client_obd_list_unlock(&cli->cl_loi_list_lock);
963
964         OBD_ALLOC_PTR(body);
965         if (!body)
966                 RETURN(-ENOMEM);
967
968         osc_announce_cached(cli, &body->oa, 0);
969
970         client_obd_list_lock(&cli->cl_loi_list_lock);
971         body->oa.o_grant = cli->cl_avail_grant - target;
972         cli->cl_avail_grant = target;
973         client_obd_list_unlock(&cli->cl_loi_list_lock);
974         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
975                 body->oa.o_valid |= OBD_MD_FLFLAGS;
976                 body->oa.o_flags = 0;
977         }
978         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
979         osc_update_next_shrink(cli);
980
981         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
982                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
983                                 sizeof(*body), body, NULL);
984         if (rc != 0)
985                 __osc_update_grant(cli, body->oa.o_grant);
986         OBD_FREE_PTR(body);
987         RETURN(rc);
988 }
989
990 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
991 static int osc_should_shrink_grant(struct client_obd *client)
992 {
993         cfs_time_t time = cfs_time_current();
994         cfs_time_t next_shrink = client->cl_next_shrink_grant;
995
996         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
997              OBD_CONNECT_GRANT_SHRINK) == 0)
998                 return 0;
999
1000         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1001                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1002                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1003                         return 1;
1004                 else
1005                         osc_update_next_shrink(client);
1006         }
1007         return 0;
1008 }
1009
1010 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1011 {
1012         struct client_obd *client;
1013
1014         cfs_list_for_each_entry(client, &item->ti_obd_list,
1015                                 cl_grant_shrink_list) {
1016                 if (osc_should_shrink_grant(client))
1017                         osc_shrink_grant(client);
1018         }
1019         return 0;
1020 }
1021
1022 static int osc_add_shrink_grant(struct client_obd *client)
1023 {
1024         int rc;
1025
1026         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1027                                        TIMEOUT_GRANT,
1028                                        osc_grant_shrink_grant_cb, NULL,
1029                                        &client->cl_grant_shrink_list);
1030         if (rc) {
1031                 CERROR("add grant client %s error %d\n",
1032                         client->cl_import->imp_obd->obd_name, rc);
1033                 return rc;
1034         }
1035         CDEBUG(D_CACHE, "add grant client %s \n",
1036                client->cl_import->imp_obd->obd_name);
1037         osc_update_next_shrink(client);
1038         return 0;
1039 }
1040
1041 static int osc_del_shrink_grant(struct client_obd *client)
1042 {
1043         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1044                                          TIMEOUT_GRANT);
1045 }
1046
1047 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1048 {
1049         /*
1050          * ocd_grant is the total grant amount we're expect to hold: if we've
1051          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1052          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1053          *
1054          * race is tolerable here: if we're evicted, but imp_state already
1055          * left EVICTED state, then cl_dirty must be 0 already.
1056          */
1057         client_obd_list_lock(&cli->cl_loi_list_lock);
1058         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1059                 cli->cl_avail_grant = ocd->ocd_grant;
1060         else
1061                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1062
1063         if (cli->cl_avail_grant < 0) {
1064                 CWARN("%s: available grant < 0, the OSS is probably not running"
1065                       " with patch from bug20278 (%ld) \n",
1066                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1067                 /* workaround for 1.6 servers which do not have
1068                  * the patch from bug20278 */
1069                 cli->cl_avail_grant = ocd->ocd_grant;
1070         }
1071
1072         /* determine the appropriate chunk size used by osc_extent. */
1073         cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
1074         client_obd_list_unlock(&cli->cl_loi_list_lock);
1075
1076         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1077                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1078                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1079
1080         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1081             cfs_list_empty(&cli->cl_grant_shrink_list))
1082                 osc_add_shrink_grant(cli);
1083 }
1084
1085 /* We assume that the reason this OSC got a short read is because it read
1086  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1087  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1088  * this stripe never got written at or beyond this stripe offset yet. */
1089 static void handle_short_read(int nob_read, obd_count page_count,
1090                               struct brw_page **pga)
1091 {
1092         char *ptr;
1093         int i = 0;
1094
1095         /* skip bytes read OK */
1096         while (nob_read > 0) {
1097                 LASSERT (page_count > 0);
1098
1099                 if (pga[i]->count > nob_read) {
1100                         /* EOF inside this page */
1101                         ptr = cfs_kmap(pga[i]->pg) +
1102                                 (pga[i]->off & ~CFS_PAGE_MASK);
1103                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1104                         cfs_kunmap(pga[i]->pg);
1105                         page_count--;
1106                         i++;
1107                         break;
1108                 }
1109
1110                 nob_read -= pga[i]->count;
1111                 page_count--;
1112                 i++;
1113         }
1114
1115         /* zero remaining pages */
1116         while (page_count-- > 0) {
1117                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1118                 memset(ptr, 0, pga[i]->count);
1119                 cfs_kunmap(pga[i]->pg);
1120                 i++;
1121         }
1122 }
1123
1124 static int check_write_rcs(struct ptlrpc_request *req,
1125                            int requested_nob, int niocount,
1126                            obd_count page_count, struct brw_page **pga)
1127 {
1128         int     i;
1129         __u32   *remote_rcs;
1130
1131         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1132                                                   sizeof(*remote_rcs) *
1133                                                   niocount);
1134         if (remote_rcs == NULL) {
1135                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1136                 return(-EPROTO);
1137         }
1138
1139         /* return error if any niobuf was in error */
1140         for (i = 0; i < niocount; i++) {
1141                 if ((int)remote_rcs[i] < 0)
1142                         return(remote_rcs[i]);
1143
1144                 if (remote_rcs[i] != 0) {
1145                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1146                                 i, remote_rcs[i], req);
1147                         return(-EPROTO);
1148                 }
1149         }
1150
1151         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1152                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1153                        req->rq_bulk->bd_nob_transferred, requested_nob);
1154                 return(-EPROTO);
1155         }
1156
1157         return (0);
1158 }
1159
1160 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1161 {
1162         if (p1->flag != p2->flag) {
1163                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1164                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1165
1166                 /* warn if we try to combine flags that we don't know to be
1167                  * safe to combine */
1168                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1169                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1170                               "report this at http://bugs.whamcloud.com/\n",
1171                               p1->flag, p2->flag);
1172                 }
1173                 return 0;
1174         }
1175
1176         return (p1->off + p1->count == p2->off);
1177 }
1178
1179 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1180                                    struct brw_page **pga, int opc,
1181                                    cksum_type_t cksum_type)
1182 {
1183         __u32                           cksum;
1184         int                             i = 0;
1185         struct cfs_crypto_hash_desc     *hdesc;
1186         unsigned int                    bufsize;
1187         int                             err;
1188         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1189
1190         LASSERT(pg_count > 0);
1191
1192         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1193         if (IS_ERR(hdesc)) {
1194                 CERROR("Unable to initialize checksum hash %s\n",
1195                        cfs_crypto_hash_name(cfs_alg));
1196                 return PTR_ERR(hdesc);
1197         }
1198
1199         while (nob > 0 && pg_count > 0) {
1200                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1201
1202                 /* corrupt the data before we compute the checksum, to
1203                  * simulate an OST->client data error */
1204                 if (i == 0 && opc == OST_READ &&
1205                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1206                         unsigned char *ptr = cfs_kmap(pga[i]->pg);
1207                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1208                         memcpy(ptr + off, "bad1", min(4, nob));
1209                         cfs_kunmap(pga[i]->pg);
1210                 }
1211                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1212                                   pga[i]->off & ~CFS_PAGE_MASK,
1213                                   count);
1214                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1215                                (int)(pga[i]->off & ~CFS_PAGE_MASK), cksum);
1216
1217                 nob -= pga[i]->count;
1218                 pg_count--;
1219                 i++;
1220         }
1221
1222         bufsize = 4;
1223         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1224
1225         if (err)
1226                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1227
1228         /* For sending we only compute the wrong checksum instead
1229          * of corrupting the data so it is still correct on a redo */
1230         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1231                 cksum++;
1232
1233         return cksum;
1234 }
1235
1236 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1237                                 struct lov_stripe_md *lsm, obd_count page_count,
1238                                 struct brw_page **pga,
1239                                 struct ptlrpc_request **reqp,
1240                                 struct obd_capa *ocapa, int reserve,
1241                                 int resend)
1242 {
1243         struct ptlrpc_request   *req;
1244         struct ptlrpc_bulk_desc *desc;
1245         struct ost_body         *body;
1246         struct obd_ioobj        *ioobj;
1247         struct niobuf_remote    *niobuf;
1248         int niocount, i, requested_nob, opc, rc;
1249         struct osc_brw_async_args *aa;
1250         struct req_capsule      *pill;
1251         struct brw_page *pg_prev;
1252
1253         ENTRY;
1254         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1255                 RETURN(-ENOMEM); /* Recoverable */
1256         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1257                 RETURN(-EINVAL); /* Fatal */
1258
1259         if ((cmd & OBD_BRW_WRITE) != 0) {
1260                 opc = OST_WRITE;
1261                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1262                                                 cli->cl_import->imp_rq_pool,
1263                                                 &RQF_OST_BRW_WRITE);
1264         } else {
1265                 opc = OST_READ;
1266                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1267         }
1268         if (req == NULL)
1269                 RETURN(-ENOMEM);
1270
1271         for (niocount = i = 1; i < page_count; i++) {
1272                 if (!can_merge_pages(pga[i - 1], pga[i]))
1273                         niocount++;
1274         }
1275
1276         pill = &req->rq_pill;
1277         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1278                              sizeof(*ioobj));
1279         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1280                              niocount * sizeof(*niobuf));
1281         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1282
1283         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1284         if (rc) {
1285                 ptlrpc_request_free(req);
1286                 RETURN(rc);
1287         }
1288         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1289         ptlrpc_at_set_req_timeout(req);
1290         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1291          * retry logic */
1292         req->rq_no_retry_einprogress = 1;
1293
1294         if (opc == OST_WRITE)
1295                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1296                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1297         else
1298                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1299                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1300
1301         if (desc == NULL)
1302                 GOTO(out, rc = -ENOMEM);
1303         /* NB request now owns desc and will free it when it gets freed */
1304
1305         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1306         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1307         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1308         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1309
1310         lustre_set_wire_obdo(&body->oa, oa);
1311
1312         obdo_to_ioobj(oa, ioobj);
1313         ioobj->ioo_bufcnt = niocount;
1314         osc_pack_capa(req, body, ocapa);
1315         LASSERT (page_count > 0);
1316         pg_prev = pga[0];
1317         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1318                 struct brw_page *pg = pga[i];
1319                 int poff = pg->off & ~CFS_PAGE_MASK;
1320
1321                 LASSERT(pg->count > 0);
1322                 /* make sure there is no gap in the middle of page array */
1323                 LASSERTF(page_count == 1 ||
1324                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1325                           ergo(i > 0 && i < page_count - 1,
1326                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1327                           ergo(i == page_count - 1, poff == 0)),
1328                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1329                          i, page_count, pg, pg->off, pg->count);
1330 #ifdef __linux__
1331                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1332                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1333                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1334                          i, page_count,
1335                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1336                          pg_prev->pg, page_private(pg_prev->pg),
1337                          pg_prev->pg->index, pg_prev->off);
1338 #else
1339                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1340                          "i %d p_c %u\n", i, page_count);
1341 #endif
1342                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1343                         (pg->flag & OBD_BRW_SRVLOCK));
1344
1345                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1346                 requested_nob += pg->count;
1347
1348                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1349                         niobuf--;
1350                         niobuf->len += pg->count;
1351                 } else {
1352                         niobuf->offset = pg->off;
1353                         niobuf->len    = pg->count;
1354                         niobuf->flags  = pg->flag;
1355                 }
1356                 pg_prev = pg;
1357         }
1358
1359         LASSERTF((void *)(niobuf - niocount) ==
1360                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1361                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1362                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1363
1364         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1365         if (resend) {
1366                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1367                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1368                         body->oa.o_flags = 0;
1369                 }
1370                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1371         }
1372
1373         if (osc_should_shrink_grant(cli))
1374                 osc_shrink_grant_local(cli, &body->oa);
1375
1376         /* size[REQ_REC_OFF] still sizeof (*body) */
1377         if (opc == OST_WRITE) {
1378                 if (cli->cl_checksum &&
1379                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1380                         /* store cl_cksum_type in a local variable since
1381                          * it can be changed via lprocfs */
1382                         cksum_type_t cksum_type = cli->cl_cksum_type;
1383
1384                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1385                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1386                                 body->oa.o_flags = 0;
1387                         }
1388                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1389                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1390                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1391                                                              page_count, pga,
1392                                                              OST_WRITE,
1393                                                              cksum_type);
1394                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1395                                body->oa.o_cksum);
1396                         /* save this in 'oa', too, for later checking */
1397                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1398                         oa->o_flags |= cksum_type_pack(cksum_type);
1399                 } else {
1400                         /* clear out the checksum flag, in case this is a
1401                          * resend but cl_checksum is no longer set. b=11238 */
1402                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1403                 }
1404                 oa->o_cksum = body->oa.o_cksum;
1405                 /* 1 RC per niobuf */
1406                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1407                                      sizeof(__u32) * niocount);
1408         } else {
1409                 if (cli->cl_checksum &&
1410                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1411                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1412                                 body->oa.o_flags = 0;
1413                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1414                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1415                 }
1416         }
1417         ptlrpc_request_set_replen(req);
1418
1419         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1420         aa = ptlrpc_req_async_args(req);
1421         aa->aa_oa = oa;
1422         aa->aa_requested_nob = requested_nob;
1423         aa->aa_nio_count = niocount;
1424         aa->aa_page_count = page_count;
1425         aa->aa_resends = 0;
1426         aa->aa_ppga = pga;
1427         aa->aa_cli = cli;
1428         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1429         if (ocapa && reserve)
1430                 aa->aa_ocapa = capa_get(ocapa);
1431
1432         *reqp = req;
1433         RETURN(0);
1434
1435  out:
1436         ptlrpc_req_finished(req);
1437         RETURN(rc);
1438 }
1439
1440 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1441                                 __u32 client_cksum, __u32 server_cksum, int nob,
1442                                 obd_count page_count, struct brw_page **pga,
1443                                 cksum_type_t client_cksum_type)
1444 {
1445         __u32 new_cksum;
1446         char *msg;
1447         cksum_type_t cksum_type;
1448
1449         if (server_cksum == client_cksum) {
1450                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1451                 return 0;
1452         }
1453
1454         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1455                                        oa->o_flags : 0);
1456         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1457                                       cksum_type);
1458
1459         if (cksum_type != client_cksum_type)
1460                 msg = "the server did not use the checksum type specified in "
1461                       "the original request - likely a protocol problem";
1462         else if (new_cksum == server_cksum)
1463                 msg = "changed on the client after we checksummed it - "
1464                       "likely false positive due to mmap IO (bug 11742)";
1465         else if (new_cksum == client_cksum)
1466                 msg = "changed in transit before arrival at OST";
1467         else
1468                 msg = "changed in transit AND doesn't match the original - "
1469                       "likely false positive due to mmap IO (bug 11742)";
1470
1471         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1472                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1473                            msg, libcfs_nid2str(peer->nid),
1474                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1475                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1476                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1477                            oa->o_id,
1478                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1479                            pga[0]->off,
1480                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1481         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1482                "client csum now %x\n", client_cksum, client_cksum_type,
1483                server_cksum, cksum_type, new_cksum);
1484         return 1;
1485 }
1486
1487 /* Note rc enters this function as number of bytes transferred */
1488 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1489 {
1490         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1491         const lnet_process_id_t *peer =
1492                         &req->rq_import->imp_connection->c_peer;
1493         struct client_obd *cli = aa->aa_cli;
1494         struct ost_body *body;
1495         __u32 client_cksum = 0;
1496         ENTRY;
1497
1498         if (rc < 0 && rc != -EDQUOT) {
1499                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1500                 RETURN(rc);
1501         }
1502
1503         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1504         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1505         if (body == NULL) {
1506                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1507                 RETURN(-EPROTO);
1508         }
1509
1510         /* set/clear over quota flag for a uid/gid */
1511         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1512             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1513                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1514
1515                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1516                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1517                        body->oa.o_flags);
1518                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1519         }
1520
1521         osc_update_grant(cli, body);
1522
1523         if (rc < 0)
1524                 RETURN(rc);
1525
1526         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1527                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1528
1529         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1530                 if (rc > 0) {
1531                         CERROR("Unexpected +ve rc %d\n", rc);
1532                         RETURN(-EPROTO);
1533                 }
1534                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1535
1536                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1537                         RETURN(-EAGAIN);
1538
1539                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1540                     check_write_checksum(&body->oa, peer, client_cksum,
1541                                          body->oa.o_cksum, aa->aa_requested_nob,
1542                                          aa->aa_page_count, aa->aa_ppga,
1543                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1544                         RETURN(-EAGAIN);
1545
1546                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1547                                      aa->aa_page_count, aa->aa_ppga);
1548                 GOTO(out, rc);
1549         }
1550
1551         /* The rest of this function executes only for OST_READs */
1552
1553         /* if unwrap_bulk failed, return -EAGAIN to retry */
1554         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1555         if (rc < 0)
1556                 GOTO(out, rc = -EAGAIN);
1557
1558         if (rc > aa->aa_requested_nob) {
1559                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1560                        aa->aa_requested_nob);
1561                 RETURN(-EPROTO);
1562         }
1563
1564         if (rc != req->rq_bulk->bd_nob_transferred) {
1565                 CERROR ("Unexpected rc %d (%d transferred)\n",
1566                         rc, req->rq_bulk->bd_nob_transferred);
1567                 return (-EPROTO);
1568         }
1569
1570         if (rc < aa->aa_requested_nob)
1571                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1572
1573         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1574                 static int cksum_counter;
1575                 __u32      server_cksum = body->oa.o_cksum;
1576                 char      *via;
1577                 char      *router;
1578                 cksum_type_t cksum_type;
1579
1580                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1581                                                body->oa.o_flags : 0);
1582                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1583                                                  aa->aa_ppga, OST_READ,
1584                                                  cksum_type);
1585
1586                 if (peer->nid == req->rq_bulk->bd_sender) {
1587                         via = router = "";
1588                 } else {
1589                         via = " via ";
1590                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1591                 }
1592
1593                 if (server_cksum == ~0 && rc > 0) {
1594                         CERROR("Protocol error: server %s set the 'checksum' "
1595                                "bit, but didn't send a checksum.  Not fatal, "
1596                                "but please notify on http://bugs.whamcloud.com/\n",
1597                                libcfs_nid2str(peer->nid));
1598                 } else if (server_cksum != client_cksum) {
1599                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1600                                            "%s%s%s inode "DFID" object "
1601                                            LPU64"/"LPU64" extent "
1602                                            "["LPU64"-"LPU64"]\n",
1603                                            req->rq_import->imp_obd->obd_name,
1604                                            libcfs_nid2str(peer->nid),
1605                                            via, router,
1606                                            body->oa.o_valid & OBD_MD_FLFID ?
1607                                                 body->oa.o_parent_seq : (__u64)0,
1608                                            body->oa.o_valid & OBD_MD_FLFID ?
1609                                                 body->oa.o_parent_oid : 0,
1610                                            body->oa.o_valid & OBD_MD_FLFID ?
1611                                                 body->oa.o_parent_ver : 0,
1612                                            body->oa.o_id,
1613                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1614                                                 body->oa.o_seq : (__u64)0,
1615                                            aa->aa_ppga[0]->off,
1616                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1617                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1618                                                                         1);
1619                         CERROR("client %x, server %x, cksum_type %x\n",
1620                                client_cksum, server_cksum, cksum_type);
1621                         cksum_counter = 0;
1622                         aa->aa_oa->o_cksum = client_cksum;
1623                         rc = -EAGAIN;
1624                 } else {
1625                         cksum_counter++;
1626                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1627                         rc = 0;
1628                 }
1629         } else if (unlikely(client_cksum)) {
1630                 static int cksum_missed;
1631
1632                 cksum_missed++;
1633                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1634                         CERROR("Checksum %u requested from %s but not sent\n",
1635                                cksum_missed, libcfs_nid2str(peer->nid));
1636         } else {
1637                 rc = 0;
1638         }
1639 out:
1640         if (rc >= 0)
1641                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1642
1643         RETURN(rc);
1644 }
1645
1646 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1647                             struct lov_stripe_md *lsm,
1648                             obd_count page_count, struct brw_page **pga,
1649                             struct obd_capa *ocapa)
1650 {
1651         struct ptlrpc_request *req;
1652         int                    rc;
1653         cfs_waitq_t            waitq;
1654         int                    generation, resends = 0;
1655         struct l_wait_info     lwi;
1656
1657         ENTRY;
1658
1659         cfs_waitq_init(&waitq);
1660         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1661
1662 restart_bulk:
1663         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1664                                   page_count, pga, &req, ocapa, 0, resends);
1665         if (rc != 0)
1666                 return (rc);
1667
1668         if (resends) {
1669                 req->rq_generation_set = 1;
1670                 req->rq_import_generation = generation;
1671                 req->rq_sent = cfs_time_current_sec() + resends;
1672         }
1673
1674         rc = ptlrpc_queue_wait(req);
1675
1676         if (rc == -ETIMEDOUT && req->rq_resend) {
1677                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1678                 ptlrpc_req_finished(req);
1679                 goto restart_bulk;
1680         }
1681
1682         rc = osc_brw_fini_request(req, rc);
1683
1684         ptlrpc_req_finished(req);
1685         /* When server return -EINPROGRESS, client should always retry
1686          * regardless of the number of times the bulk was resent already.*/
1687         if (osc_recoverable_error(rc)) {
1688                 resends++;
1689                 if (rc != -EINPROGRESS &&
1690                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1691                         CERROR("%s: too many resend retries for object: "
1692                                ""LPU64":"LPU64", rc = %d.\n",
1693                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1694                         goto out;
1695                 }
1696                 if (generation !=
1697                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1698                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1699                                ""LPU64":"LPU64", rc = %d.\n",
1700                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1701                         goto out;
1702                 }
1703
1704                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1705                                        NULL);
1706                 l_wait_event(waitq, 0, &lwi);
1707
1708                 goto restart_bulk;
1709         }
1710 out:
1711         if (rc == -EAGAIN || rc == -EINPROGRESS)
1712                 rc = -EIO;
1713         RETURN (rc);
1714 }
1715
1716 static int osc_brw_redo_request(struct ptlrpc_request *request,
1717                                 struct osc_brw_async_args *aa, int rc)
1718 {
1719         struct ptlrpc_request *new_req;
1720         struct osc_brw_async_args *new_aa;
1721         struct osc_async_page *oap;
1722         ENTRY;
1723
1724         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1725                   "redo for recoverable error %d", rc);
1726
1727         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1728                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1729                                   aa->aa_cli, aa->aa_oa,
1730                                   NULL /* lsm unused by osc currently */,
1731                                   aa->aa_page_count, aa->aa_ppga,
1732                                   &new_req, aa->aa_ocapa, 0, 1);
1733         if (rc)
1734                 RETURN(rc);
1735
1736         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1737                 if (oap->oap_request != NULL) {
1738                         LASSERTF(request == oap->oap_request,
1739                                  "request %p != oap_request %p\n",
1740                                  request, oap->oap_request);
1741                         if (oap->oap_interrupted) {
1742                                 ptlrpc_req_finished(new_req);
1743                                 RETURN(-EINTR);
1744                         }
1745                 }
1746         }
1747         /* New request takes over pga and oaps from old request.
1748          * Note that copying a list_head doesn't work, need to move it... */
1749         aa->aa_resends++;
1750         new_req->rq_interpret_reply = request->rq_interpret_reply;
1751         new_req->rq_async_args = request->rq_async_args;
1752         /* cap resend delay to the current request timeout, this is similar to
1753          * what ptlrpc does (see after_reply()) */
1754         if (aa->aa_resends > new_req->rq_timeout)
1755                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1756         else
1757                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1758         new_req->rq_generation_set = 1;
1759         new_req->rq_import_generation = request->rq_import_generation;
1760
1761         new_aa = ptlrpc_req_async_args(new_req);
1762
1763         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1764         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1765         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1766         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1767
1768         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1769                 if (oap->oap_request) {
1770                         ptlrpc_req_finished(oap->oap_request);
1771                         oap->oap_request = ptlrpc_request_addref(new_req);
1772                 }
1773         }
1774
1775         new_aa->aa_ocapa = aa->aa_ocapa;
1776         aa->aa_ocapa = NULL;
1777
1778         /* XXX: This code will run into problem if we're going to support
1779          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1780          * and wait for all of them to be finished. We should inherit request
1781          * set from old request. */
1782         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1783
1784         DEBUG_REQ(D_INFO, new_req, "new request");
1785         RETURN(0);
1786 }
1787
1788 /*
1789  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1790  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1791  * fine for our small page arrays and doesn't require allocation.  its an
1792  * insertion sort that swaps elements that are strides apart, shrinking the
1793  * stride down until its '1' and the array is sorted.
1794  */
1795 static void sort_brw_pages(struct brw_page **array, int num)
1796 {
1797         int stride, i, j;
1798         struct brw_page *tmp;
1799
1800         if (num == 1)
1801                 return;
1802         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1803                 ;
1804
1805         do {
1806                 stride /= 3;
1807                 for (i = stride ; i < num ; i++) {
1808                         tmp = array[i];
1809                         j = i;
1810                         while (j >= stride && array[j - stride]->off > tmp->off) {
1811                                 array[j] = array[j - stride];
1812                                 j -= stride;
1813                         }
1814                         array[j] = tmp;
1815                 }
1816         } while (stride > 1);
1817 }
1818
1819 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1820 {
1821         int count = 1;
1822         int offset;
1823         int i = 0;
1824
1825         LASSERT (pages > 0);
1826         offset = pg[i]->off & ~CFS_PAGE_MASK;
1827
1828         for (;;) {
1829                 pages--;
1830                 if (pages == 0)         /* that's all */
1831                         return count;
1832
1833                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1834                         return count;   /* doesn't end on page boundary */
1835
1836                 i++;
1837                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1838                 if (offset != 0)        /* doesn't start on page boundary */
1839                         return count;
1840
1841                 count++;
1842         }
1843 }
1844
1845 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1846 {
1847         struct brw_page **ppga;
1848         int i;
1849
1850         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1851         if (ppga == NULL)
1852                 return NULL;
1853
1854         for (i = 0; i < count; i++)
1855                 ppga[i] = pga + i;
1856         return ppga;
1857 }
1858
1859 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1860 {
1861         LASSERT(ppga != NULL);
1862         OBD_FREE(ppga, sizeof(*ppga) * count);
1863 }
1864
1865 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1866                    obd_count page_count, struct brw_page *pga,
1867                    struct obd_trans_info *oti)
1868 {
1869         struct obdo *saved_oa = NULL;
1870         struct brw_page **ppga, **orig;
1871         struct obd_import *imp = class_exp2cliimp(exp);
1872         struct client_obd *cli;
1873         int rc, page_count_orig;
1874         ENTRY;
1875
1876         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1877         cli = &imp->imp_obd->u.cli;
1878
1879         if (cmd & OBD_BRW_CHECK) {
1880                 /* The caller just wants to know if there's a chance that this
1881                  * I/O can succeed */
1882
1883                 if (imp->imp_invalid)
1884                         RETURN(-EIO);
1885                 RETURN(0);
1886         }
1887
1888         /* test_brw with a failed create can trip this, maybe others. */
1889         LASSERT(cli->cl_max_pages_per_rpc);
1890
1891         rc = 0;
1892
1893         orig = ppga = osc_build_ppga(pga, page_count);
1894         if (ppga == NULL)
1895                 RETURN(-ENOMEM);
1896         page_count_orig = page_count;
1897
1898         sort_brw_pages(ppga, page_count);
1899         while (page_count) {
1900                 obd_count pages_per_brw;
1901
1902                 if (page_count > cli->cl_max_pages_per_rpc)
1903                         pages_per_brw = cli->cl_max_pages_per_rpc;
1904                 else
1905                         pages_per_brw = page_count;
1906
1907                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1908
1909                 if (saved_oa != NULL) {
1910                         /* restore previously saved oa */
1911                         *oinfo->oi_oa = *saved_oa;
1912                 } else if (page_count > pages_per_brw) {
1913                         /* save a copy of oa (brw will clobber it) */
1914                         OBDO_ALLOC(saved_oa);
1915                         if (saved_oa == NULL)
1916                                 GOTO(out, rc = -ENOMEM);
1917                         *saved_oa = *oinfo->oi_oa;
1918                 }
1919
1920                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1921                                       pages_per_brw, ppga, oinfo->oi_capa);
1922
1923                 if (rc != 0)
1924                         break;
1925
1926                 page_count -= pages_per_brw;
1927                 ppga += pages_per_brw;
1928         }
1929
1930 out:
1931         osc_release_ppga(orig, page_count_orig);
1932
1933         if (saved_oa != NULL)
1934                 OBDO_FREE(saved_oa);
1935
1936         RETURN(rc);
1937 }
1938
1939 static int brw_interpret(const struct lu_env *env,
1940                          struct ptlrpc_request *req, void *data, int rc)
1941 {
1942         struct osc_brw_async_args *aa = data;
1943         struct osc_extent *ext;
1944         struct osc_extent *tmp;
1945         struct cl_object  *obj = NULL;
1946         struct client_obd *cli = aa->aa_cli;
1947         ENTRY;
1948
1949         rc = osc_brw_fini_request(req, rc);
1950         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1951         /* When server return -EINPROGRESS, client should always retry
1952          * regardless of the number of times the bulk was resent already. */
1953         if (osc_recoverable_error(rc)) {
1954                 if (req->rq_import_generation !=
1955                     req->rq_import->imp_generation) {
1956                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1957                                ""LPU64":"LPU64", rc = %d.\n",
1958                                req->rq_import->imp_obd->obd_name,
1959                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1960                 } else if (rc == -EINPROGRESS ||
1961                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1962                         rc = osc_brw_redo_request(req, aa, rc);
1963                 } else {
1964                         CERROR("%s: too many resent retries for object: "
1965                                ""LPU64":"LPU64", rc = %d.\n",
1966                                req->rq_import->imp_obd->obd_name,
1967                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1968                 }
1969
1970                 if (rc == 0)
1971                         RETURN(0);
1972                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1973                         rc = -EIO;
1974         }
1975
1976         if (aa->aa_ocapa) {
1977                 capa_put(aa->aa_ocapa);
1978                 aa->aa_ocapa = NULL;
1979         }
1980
1981         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1982                 if (obj == NULL && rc == 0) {
1983                         obj = osc2cl(ext->oe_obj);
1984                         cl_object_get(obj);
1985                 }
1986
1987                 cfs_list_del_init(&ext->oe_link);
1988                 osc_extent_finish(env, ext, 1, rc);
1989         }
1990         LASSERT(cfs_list_empty(&aa->aa_exts));
1991         LASSERT(cfs_list_empty(&aa->aa_oaps));
1992
1993         if (obj != NULL) {
1994                 struct obdo *oa = aa->aa_oa;
1995                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1996                 unsigned long valid = 0;
1997
1998                 LASSERT(rc == 0);
1999                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2000                         attr->cat_blocks = oa->o_blocks;
2001                         valid |= CAT_BLOCKS;
2002                 }
2003                 if (oa->o_valid & OBD_MD_FLMTIME) {
2004                         attr->cat_mtime = oa->o_mtime;
2005                         valid |= CAT_MTIME;
2006                 }
2007                 if (oa->o_valid & OBD_MD_FLATIME) {
2008                         attr->cat_atime = oa->o_atime;
2009                         valid |= CAT_ATIME;
2010                 }
2011                 if (oa->o_valid & OBD_MD_FLCTIME) {
2012                         attr->cat_ctime = oa->o_ctime;
2013                         valid |= CAT_CTIME;
2014                 }
2015                 if (valid != 0) {
2016                         cl_object_attr_lock(obj);
2017                         cl_object_attr_set(env, obj, attr, valid);
2018                         cl_object_attr_unlock(obj);
2019                 }
2020                 cl_object_put(env, obj);
2021         }
2022         OBDO_FREE(aa->aa_oa);
2023
2024         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2025                           req->rq_bulk->bd_nob_transferred);
2026         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2027         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2028
2029         client_obd_list_lock(&cli->cl_loi_list_lock);
2030         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2031          * is called so we know whether to go to sync BRWs or wait for more
2032          * RPCs to complete */
2033         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2034                 cli->cl_w_in_flight--;
2035         else
2036                 cli->cl_r_in_flight--;
2037         osc_wake_cache_waiters(cli);
2038         client_obd_list_unlock(&cli->cl_loi_list_lock);
2039
2040         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2041         RETURN(rc);
2042 }
2043
2044 /**
2045  * Build an RPC by the list of extent @ext_list. The caller must ensure
2046  * that the total pages in this list are NOT over max pages per RPC.
2047  * Extents in the list must be in OES_RPC state.
2048  */
2049 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2050                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2051 {
2052         struct ptlrpc_request *req = NULL;
2053         struct osc_extent *ext;
2054         CFS_LIST_HEAD(rpc_list);
2055         struct brw_page **pga = NULL;
2056         struct osc_brw_async_args *aa = NULL;
2057         struct obdo *oa = NULL;
2058         struct osc_async_page *oap;
2059         struct osc_async_page *tmp;
2060         struct cl_req *clerq = NULL;
2061         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2062         struct ldlm_lock *lock = NULL;
2063         struct cl_req_attr crattr;
2064         obd_off starting_offset = OBD_OBJECT_EOF;
2065         obd_off ending_offset = 0;
2066         int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
2067
2068         ENTRY;
2069         LASSERT(!cfs_list_empty(ext_list));
2070
2071         /* add pages into rpc_list to build BRW rpc */
2072         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2073                 LASSERT(ext->oe_state == OES_RPC);
2074                 mem_tight |= ext->oe_memalloc;
2075                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2076                         ++page_count;
2077                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2078                         if (starting_offset > oap->oap_obj_off)
2079                                 starting_offset = oap->oap_obj_off;
2080                         else
2081                                 LASSERT(oap->oap_page_off == 0);
2082                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2083                                 ending_offset = oap->oap_obj_off +
2084                                                 oap->oap_count;
2085                         else
2086                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2087                                         CFS_PAGE_SIZE);
2088                 }
2089         }
2090
2091         if (mem_tight)
2092                 mpflag = cfs_memory_pressure_get_and_set();
2093
2094         memset(&crattr, 0, sizeof crattr);
2095         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2096         if (pga == NULL)
2097                 GOTO(out, rc = -ENOMEM);
2098
2099         OBDO_ALLOC(oa);
2100         if (oa == NULL)
2101                 GOTO(out, rc = -ENOMEM);
2102
2103         i = 0;
2104         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2105                 struct cl_page *page = oap2cl_page(oap);
2106                 if (clerq == NULL) {
2107                         clerq = cl_req_alloc(env, page, crt,
2108                                              1 /* only 1-object rpcs for
2109                                                 * now */);
2110                         if (IS_ERR(clerq))
2111                                 GOTO(out, rc = PTR_ERR(clerq));
2112                         lock = oap->oap_ldlm_lock;
2113                 }
2114                 if (mem_tight)
2115                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2116                 pga[i] = &oap->oap_brw_page;
2117                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2118                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2119                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2120                 i++;
2121                 cl_req_page_add(env, clerq, page);
2122         }
2123
2124         /* always get the data for the obdo for the rpc */
2125         LASSERT(clerq != NULL);
2126         crattr.cra_oa = oa;
2127         crattr.cra_capa = NULL;
2128         memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2129         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2130         if (lock) {
2131                 oa->o_handle = lock->l_remote_handle;
2132                 oa->o_valid |= OBD_MD_FLHANDLE;
2133         }
2134
2135         rc = cl_req_prep(env, clerq);
2136         if (rc != 0) {
2137                 CERROR("cl_req_prep failed: %d\n", rc);
2138                 GOTO(out, rc);
2139         }
2140
2141         sort_brw_pages(pga, page_count);
2142         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2143                         pga, &req, crattr.cra_capa, 1, 0);
2144         if (rc != 0) {
2145                 CERROR("prep_req failed: %d\n", rc);
2146                 GOTO(out, rc);
2147         }
2148
2149         req->rq_interpret_reply = brw_interpret;
2150         if (mem_tight != 0)
2151                 req->rq_memalloc = 1;
2152
2153         /* Need to update the timestamps after the request is built in case
2154          * we race with setattr (locally or in queue at OST).  If OST gets
2155          * later setattr before earlier BRW (as determined by the request xid),
2156          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2157          * way to do this in a single call.  bug 10150 */
2158         cl_req_attr_set(env, clerq, &crattr,
2159                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2160
2161         lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2162
2163         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2164         aa = ptlrpc_req_async_args(req);
2165         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2166         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2167         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2168         cfs_list_splice_init(ext_list, &aa->aa_exts);
2169         aa->aa_clerq = clerq;
2170
2171         /* queued sync pages can be torn down while the pages
2172          * were between the pending list and the rpc */
2173         tmp = NULL;
2174         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2175                 /* only one oap gets a request reference */
2176                 if (tmp == NULL)
2177                         tmp = oap;
2178                 if (oap->oap_interrupted && !req->rq_intr) {
2179                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2180                                         oap, req);
2181                         ptlrpc_mark_interrupted(req);
2182                 }
2183         }
2184         if (tmp != NULL)
2185                 tmp->oap_request = ptlrpc_request_addref(req);
2186
2187         client_obd_list_lock(&cli->cl_loi_list_lock);
2188         starting_offset >>= CFS_PAGE_SHIFT;
2189         if (cmd == OBD_BRW_READ) {
2190                 cli->cl_r_in_flight++;
2191                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2192                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2193                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2194                                       starting_offset + 1);
2195         } else {
2196                 cli->cl_w_in_flight++;
2197                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2198                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2199                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2200                                       starting_offset + 1);
2201         }
2202         client_obd_list_unlock(&cli->cl_loi_list_lock);
2203
2204         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2205                   page_count, aa, cli->cl_r_in_flight,
2206                   cli->cl_w_in_flight);
2207
2208         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2209          * see which CPU/NUMA node the majority of pages were allocated
2210          * on, and try to assign the async RPC to the CPU core
2211          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2212          *
2213          * But on the other hand, we expect that multiple ptlrpcd
2214          * threads and the initial write sponsor can run in parallel,
2215          * especially when data checksum is enabled, which is CPU-bound
2216          * operation and single ptlrpcd thread cannot process in time.
2217          * So more ptlrpcd threads sharing BRW load
2218          * (with PDL_POLICY_ROUND) seems better.
2219          */
2220         ptlrpcd_add_req(req, pol, -1);
2221         rc = 0;
2222         EXIT;
2223
2224 out:
2225         if (mem_tight != 0)
2226                 cfs_memory_pressure_restore(mpflag);
2227
2228         capa_put(crattr.cra_capa);
2229         if (rc != 0) {
2230                 LASSERT(req == NULL);
2231
2232                 if (oa)
2233                         OBDO_FREE(oa);
2234                 if (pga)
2235                         OBD_FREE(pga, sizeof(*pga) * page_count);
2236                 /* this should happen rarely and is pretty bad, it makes the
2237                  * pending list not follow the dirty order */
2238                 while (!cfs_list_empty(ext_list)) {
2239                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2240                                              oe_link);
2241                         cfs_list_del_init(&ext->oe_link);
2242                         osc_extent_finish(env, ext, 0, rc);
2243                 }
2244                 if (clerq && !IS_ERR(clerq))
2245                         cl_req_completion(env, clerq, rc);
2246         }
2247         RETURN(rc);
2248 }
2249
2250 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2251                                         struct ldlm_enqueue_info *einfo)
2252 {
2253         void *data = einfo->ei_cbdata;
2254         int set = 0;
2255
2256         LASSERT(lock != NULL);
2257         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2258         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2259         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2260         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2261
2262         lock_res_and_lock(lock);
2263         spin_lock(&osc_ast_guard);
2264
2265         if (lock->l_ast_data == NULL)
2266                 lock->l_ast_data = data;
2267         if (lock->l_ast_data == data)
2268                 set = 1;
2269
2270         spin_unlock(&osc_ast_guard);
2271         unlock_res_and_lock(lock);
2272
2273         return set;
2274 }
2275
2276 static int osc_set_data_with_check(struct lustre_handle *lockh,
2277                                    struct ldlm_enqueue_info *einfo)
2278 {
2279         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2280         int set = 0;
2281
2282         if (lock != NULL) {
2283                 set = osc_set_lock_data_with_check(lock, einfo);
2284                 LDLM_LOCK_PUT(lock);
2285         } else
2286                 CERROR("lockh %p, data %p - client evicted?\n",
2287                        lockh, einfo->ei_cbdata);
2288         return set;
2289 }
2290
2291 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2292                              ldlm_iterator_t replace, void *data)
2293 {
2294         struct ldlm_res_id res_id;
2295         struct obd_device *obd = class_exp2obd(exp);
2296
2297         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2298         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2299         return 0;
2300 }
2301
2302 /* find any ldlm lock of the inode in osc
2303  * return 0    not find
2304  *        1    find one
2305  *      < 0    error */
2306 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2307                            ldlm_iterator_t replace, void *data)
2308 {
2309         struct ldlm_res_id res_id;
2310         struct obd_device *obd = class_exp2obd(exp);
2311         int rc = 0;
2312
2313         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2314         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2315         if (rc == LDLM_ITER_STOP)
2316                 return(1);
2317         if (rc == LDLM_ITER_CONTINUE)
2318                 return(0);
2319         return(rc);
2320 }
2321
2322 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2323                             obd_enqueue_update_f upcall, void *cookie,
2324                             __u64 *flags, int agl, int rc)
2325 {
2326         int intent = *flags & LDLM_FL_HAS_INTENT;
2327         ENTRY;
2328
2329         if (intent) {
2330                 /* The request was created before ldlm_cli_enqueue call. */
2331                 if (rc == ELDLM_LOCK_ABORTED) {
2332                         struct ldlm_reply *rep;
2333                         rep = req_capsule_server_get(&req->rq_pill,
2334                                                      &RMF_DLM_REP);
2335
2336                         LASSERT(rep != NULL);
2337                         if (rep->lock_policy_res1)
2338                                 rc = rep->lock_policy_res1;
2339                 }
2340         }
2341
2342         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2343             (rc == 0)) {
2344                 *flags |= LDLM_FL_LVB_READY;
2345                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2346                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2347         }
2348
2349         /* Call the update callback. */
2350         rc = (*upcall)(cookie, rc);
2351         RETURN(rc);
2352 }
2353
2354 static int osc_enqueue_interpret(const struct lu_env *env,
2355                                  struct ptlrpc_request *req,
2356                                  struct osc_enqueue_args *aa, int rc)
2357 {
2358         struct ldlm_lock *lock;
2359         struct lustre_handle handle;
2360         __u32 mode;
2361         struct ost_lvb *lvb;
2362         __u32 lvb_len;
2363         __u64 *flags = aa->oa_flags;
2364
2365         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2366          * might be freed anytime after lock upcall has been called. */
2367         lustre_handle_copy(&handle, aa->oa_lockh);
2368         mode = aa->oa_ei->ei_mode;
2369
2370         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2371          * be valid. */
2372         lock = ldlm_handle2lock(&handle);
2373
2374         /* Take an additional reference so that a blocking AST that
2375          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2376          * to arrive after an upcall has been executed by
2377          * osc_enqueue_fini(). */
2378         ldlm_lock_addref(&handle, mode);
2379
2380         /* Let CP AST to grant the lock first. */
2381         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2382
2383         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2384                 lvb = NULL;
2385                 lvb_len = 0;
2386         } else {
2387                 lvb = aa->oa_lvb;
2388                 lvb_len = sizeof(*aa->oa_lvb);
2389         }
2390
2391         /* Complete obtaining the lock procedure. */
2392         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2393                                    mode, flags, lvb, lvb_len, &handle, rc);
2394         /* Complete osc stuff. */
2395         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2396                               flags, aa->oa_agl, rc);
2397
2398         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2399
2400         /* Release the lock for async request. */
2401         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2402                 /*
2403                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2404                  * not already released by
2405                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2406                  */
2407                 ldlm_lock_decref(&handle, mode);
2408
2409         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2410                  aa->oa_lockh, req, aa);
2411         ldlm_lock_decref(&handle, mode);
2412         LDLM_LOCK_PUT(lock);
2413         return rc;
2414 }
2415
2416 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2417                         struct lov_oinfo *loi, int flags,
2418                         struct ost_lvb *lvb, __u32 mode, int rc)
2419 {
2420         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2421
2422         if (rc == ELDLM_OK) {
2423                 __u64 tmp;
2424
2425                 LASSERT(lock != NULL);
2426                 loi->loi_lvb = *lvb;
2427                 tmp = loi->loi_lvb.lvb_size;
2428                 /* Extend KMS up to the end of this lock and no further
2429                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2430                 if (tmp > lock->l_policy_data.l_extent.end)
2431                         tmp = lock->l_policy_data.l_extent.end + 1;
2432                 if (tmp >= loi->loi_kms) {
2433                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2434                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2435                         loi_kms_set(loi, tmp);
2436                 } else {
2437                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2438                                    LPU64"; leaving kms="LPU64", end="LPU64,
2439                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2440                                    lock->l_policy_data.l_extent.end);
2441                 }
2442                 ldlm_lock_allow_match(lock);
2443         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2444                 LASSERT(lock != NULL);
2445                 loi->loi_lvb = *lvb;
2446                 ldlm_lock_allow_match(lock);
2447                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2448                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2449                 rc = ELDLM_OK;
2450         }
2451
2452         if (lock != NULL) {
2453                 if (rc != ELDLM_OK)
2454                         ldlm_lock_fail_match(lock);
2455
2456                 LDLM_LOCK_PUT(lock);
2457         }
2458 }
2459 EXPORT_SYMBOL(osc_update_enqueue);
2460
2461 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2462
2463 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2464  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2465  * other synchronous requests, however keeping some locks and trying to obtain
2466  * others may take a considerable amount of time in a case of ost failure; and
2467  * when other sync requests do not get released lock from a client, the client
2468  * is excluded from the cluster -- such scenarious make the life difficult, so
2469  * release locks just after they are obtained. */
2470 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2471                      __u64 *flags, ldlm_policy_data_t *policy,
2472                      struct ost_lvb *lvb, int kms_valid,
2473                      obd_enqueue_update_f upcall, void *cookie,
2474                      struct ldlm_enqueue_info *einfo,
2475                      struct lustre_handle *lockh,
2476                      struct ptlrpc_request_set *rqset, int async, int agl)
2477 {
2478         struct obd_device *obd = exp->exp_obd;
2479         struct ptlrpc_request *req = NULL;
2480         int intent = *flags & LDLM_FL_HAS_INTENT;
2481         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2482         ldlm_mode_t mode;
2483         int rc;
2484         ENTRY;
2485
2486         /* Filesystem lock extents are extended to page boundaries so that
2487          * dealing with the page cache is a little smoother.  */
2488         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2489         policy->l_extent.end |= ~CFS_PAGE_MASK;
2490
2491         /*
2492          * kms is not valid when either object is completely fresh (so that no
2493          * locks are cached), or object was evicted. In the latter case cached
2494          * lock cannot be used, because it would prime inode state with
2495          * potentially stale LVB.
2496          */
2497         if (!kms_valid)
2498                 goto no_match;
2499
2500         /* Next, search for already existing extent locks that will cover us */
2501         /* If we're trying to read, we also search for an existing PW lock.  The
2502          * VFS and page cache already protect us locally, so lots of readers/
2503          * writers can share a single PW lock.
2504          *
2505          * There are problems with conversion deadlocks, so instead of
2506          * converting a read lock to a write lock, we'll just enqueue a new
2507          * one.
2508          *
2509          * At some point we should cancel the read lock instead of making them
2510          * send us a blocking callback, but there are problems with canceling
2511          * locks out from other users right now, too. */
2512         mode = einfo->ei_mode;
2513         if (einfo->ei_mode == LCK_PR)
2514                 mode |= LCK_PW;
2515         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2516                                einfo->ei_type, policy, mode, lockh, 0);
2517         if (mode) {
2518                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2519
2520                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2521                         /* For AGL, if enqueue RPC is sent but the lock is not
2522                          * granted, then skip to process this strpe.
2523                          * Return -ECANCELED to tell the caller. */
2524                         ldlm_lock_decref(lockh, mode);
2525                         LDLM_LOCK_PUT(matched);
2526                         RETURN(-ECANCELED);
2527                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2528                         *flags |= LDLM_FL_LVB_READY;
2529                         /* addref the lock only if not async requests and PW
2530                          * lock is matched whereas we asked for PR. */
2531                         if (!rqset && einfo->ei_mode != mode)
2532                                 ldlm_lock_addref(lockh, LCK_PR);
2533                         if (intent) {
2534                                 /* I would like to be able to ASSERT here that
2535                                  * rss <= kms, but I can't, for reasons which
2536                                  * are explained in lov_enqueue() */
2537                         }
2538
2539                         /* We already have a lock, and it's referenced.
2540                          *
2541                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2542                          * AGL upcall may change it to CLS_HELD directly. */
2543                         (*upcall)(cookie, ELDLM_OK);
2544
2545                         if (einfo->ei_mode != mode)
2546                                 ldlm_lock_decref(lockh, LCK_PW);
2547                         else if (rqset)
2548                                 /* For async requests, decref the lock. */
2549                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2550                         LDLM_LOCK_PUT(matched);
2551                         RETURN(ELDLM_OK);
2552                 } else {
2553                         ldlm_lock_decref(lockh, mode);
2554                         LDLM_LOCK_PUT(matched);
2555                 }
2556         }
2557
2558  no_match:
2559         if (intent) {
2560                 CFS_LIST_HEAD(cancels);
2561                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2562                                            &RQF_LDLM_ENQUEUE_LVB);
2563                 if (req == NULL)
2564                         RETURN(-ENOMEM);
2565
2566                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2567                 if (rc) {
2568                         ptlrpc_request_free(req);
2569                         RETURN(rc);
2570                 }
2571
2572                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2573                                      sizeof *lvb);
2574                 ptlrpc_request_set_replen(req);
2575         }
2576
2577         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2578         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2579
2580         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2581                               sizeof(*lvb), LVB_T_OST, lockh, async);
2582         if (rqset) {
2583                 if (!rc) {
2584                         struct osc_enqueue_args *aa;
2585                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2586                         aa = ptlrpc_req_async_args(req);
2587                         aa->oa_ei = einfo;
2588                         aa->oa_exp = exp;
2589                         aa->oa_flags  = flags;
2590                         aa->oa_upcall = upcall;
2591                         aa->oa_cookie = cookie;
2592                         aa->oa_lvb    = lvb;
2593                         aa->oa_lockh  = lockh;
2594                         aa->oa_agl    = !!agl;
2595
2596                         req->rq_interpret_reply =
2597                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2598                         if (rqset == PTLRPCD_SET)
2599                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2600                         else
2601                                 ptlrpc_set_add_req(rqset, req);
2602                 } else if (intent) {
2603                         ptlrpc_req_finished(req);
2604                 }
2605                 RETURN(rc);
2606         }
2607
2608         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2609         if (intent)
2610                 ptlrpc_req_finished(req);
2611
2612         RETURN(rc);
2613 }
2614
2615 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2616                        struct ldlm_enqueue_info *einfo,
2617                        struct ptlrpc_request_set *rqset)
2618 {
2619         struct ldlm_res_id res_id;
2620         int rc;
2621         ENTRY;
2622
2623         osc_build_res_name(oinfo->oi_md->lsm_object_id,
2624                            oinfo->oi_md->lsm_object_seq, &res_id);
2625
2626         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2627                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2628                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2629                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2630                               rqset, rqset != NULL, 0);
2631         RETURN(rc);
2632 }
2633
2634 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2635                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2636                    int *flags, void *data, struct lustre_handle *lockh,
2637                    int unref)
2638 {
2639         struct obd_device *obd = exp->exp_obd;
2640         int lflags = *flags;
2641         ldlm_mode_t rc;
2642         ENTRY;
2643
2644         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2645                 RETURN(-EIO);
2646
2647         /* Filesystem lock extents are extended to page boundaries so that
2648          * dealing with the page cache is a little smoother */
2649         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2650         policy->l_extent.end |= ~CFS_PAGE_MASK;
2651
2652         /* Next, search for already existing extent locks that will cover us */
2653         /* If we're trying to read, we also search for an existing PW lock.  The
2654          * VFS and page cache already protect us locally, so lots of readers/
2655          * writers can share a single PW lock. */
2656         rc = mode;
2657         if (mode == LCK_PR)
2658                 rc |= LCK_PW;
2659         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2660                              res_id, type, policy, rc, lockh, unref);
2661         if (rc) {
2662                 if (data != NULL) {
2663                         if (!osc_set_data_with_check(lockh, data)) {
2664                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2665                                         ldlm_lock_decref(lockh, rc);
2666                                 RETURN(0);
2667                         }
2668                 }
2669                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2670                         ldlm_lock_addref(lockh, LCK_PR);
2671                         ldlm_lock_decref(lockh, LCK_PW);
2672                 }
2673                 RETURN(rc);
2674         }
2675         RETURN(rc);
2676 }
2677
2678 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2679 {
2680         ENTRY;
2681
2682         if (unlikely(mode == LCK_GROUP))
2683                 ldlm_lock_decref_and_cancel(lockh, mode);
2684         else
2685                 ldlm_lock_decref(lockh, mode);
2686
2687         RETURN(0);
2688 }
2689
2690 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2691                       __u32 mode, struct lustre_handle *lockh)
2692 {
2693         ENTRY;
2694         RETURN(osc_cancel_base(lockh, mode));
2695 }
2696
2697 static int osc_cancel_unused(struct obd_export *exp,
2698                              struct lov_stripe_md *lsm,
2699                              ldlm_cancel_flags_t flags,
2700                              void *opaque)
2701 {
2702         struct obd_device *obd = class_exp2obd(exp);
2703         struct ldlm_res_id res_id, *resp = NULL;
2704
2705         if (lsm != NULL) {
2706                 resp = osc_build_res_name(lsm->lsm_object_id,
2707                                           lsm->lsm_object_seq, &res_id);
2708         }
2709
2710         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2711 }
2712
2713 static int osc_statfs_interpret(const struct lu_env *env,
2714                                 struct ptlrpc_request *req,
2715                                 struct osc_async_args *aa, int rc)
2716 {
2717         struct obd_statfs *msfs;
2718         ENTRY;
2719
2720         if (rc == -EBADR)
2721                 /* The request has in fact never been sent
2722                  * due to issues at a higher level (LOV).
2723                  * Exit immediately since the caller is
2724                  * aware of the problem and takes care
2725                  * of the clean up */
2726                  RETURN(rc);
2727
2728         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2729             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2730                 GOTO(out, rc = 0);
2731
2732         if (rc != 0)
2733                 GOTO(out, rc);
2734
2735         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2736         if (msfs == NULL) {
2737                 GOTO(out, rc = -EPROTO);
2738         }
2739
2740         *aa->aa_oi->oi_osfs = *msfs;
2741 out:
2742         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2743         RETURN(rc);
2744 }
2745
2746 static int osc_statfs_async(struct obd_export *exp,
2747                             struct obd_info *oinfo, __u64 max_age,
2748                             struct ptlrpc_request_set *rqset)
2749 {
2750         struct obd_device     *obd = class_exp2obd(exp);
2751         struct ptlrpc_request *req;
2752         struct osc_async_args *aa;
2753         int                    rc;
2754         ENTRY;
2755
2756         /* We could possibly pass max_age in the request (as an absolute
2757          * timestamp or a "seconds.usec ago") so the target can avoid doing
2758          * extra calls into the filesystem if that isn't necessary (e.g.
2759          * during mount that would help a bit).  Having relative timestamps
2760          * is not so great if request processing is slow, while absolute
2761          * timestamps are not ideal because they need time synchronization. */
2762         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2763         if (req == NULL)
2764                 RETURN(-ENOMEM);
2765
2766         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2767         if (rc) {
2768                 ptlrpc_request_free(req);
2769                 RETURN(rc);
2770         }
2771         ptlrpc_request_set_replen(req);
2772         req->rq_request_portal = OST_CREATE_PORTAL;
2773         ptlrpc_at_set_req_timeout(req);
2774
2775         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2776                 /* procfs requests not want stat in wait for avoid deadlock */
2777                 req->rq_no_resend = 1;
2778                 req->rq_no_delay = 1;
2779         }
2780
2781         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2782         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2783         aa = ptlrpc_req_async_args(req);
2784         aa->aa_oi = oinfo;
2785
2786         ptlrpc_set_add_req(rqset, req);
2787         RETURN(0);
2788 }
2789
2790 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2791                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2792 {
2793         struct obd_device     *obd = class_exp2obd(exp);
2794         struct obd_statfs     *msfs;
2795         struct ptlrpc_request *req;
2796         struct obd_import     *imp = NULL;
2797         int rc;
2798         ENTRY;
2799
2800         /*Since the request might also come from lprocfs, so we need
2801          *sync this with client_disconnect_export Bug15684*/
2802         down_read(&obd->u.cli.cl_sem);
2803         if (obd->u.cli.cl_import)
2804                 imp = class_import_get(obd->u.cli.cl_import);
2805         up_read(&obd->u.cli.cl_sem);
2806         if (!imp)
2807                 RETURN(-ENODEV);
2808
2809         /* We could possibly pass max_age in the request (as an absolute
2810          * timestamp or a "seconds.usec ago") so the target can avoid doing
2811          * extra calls into the filesystem if that isn't necessary (e.g.
2812          * during mount that would help a bit).  Having relative timestamps
2813          * is not so great if request processing is slow, while absolute
2814          * timestamps are not ideal because they need time synchronization. */
2815         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2816
2817         class_import_put(imp);
2818
2819         if (req == NULL)
2820                 RETURN(-ENOMEM);
2821
2822         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2823         if (rc) {
2824                 ptlrpc_request_free(req);
2825                 RETURN(rc);
2826         }
2827         ptlrpc_request_set_replen(req);
2828         req->rq_request_portal = OST_CREATE_PORTAL;
2829         ptlrpc_at_set_req_timeout(req);
2830
2831         if (flags & OBD_STATFS_NODELAY) {
2832                 /* procfs requests not want stat in wait for avoid deadlock */
2833                 req->rq_no_resend = 1;
2834                 req->rq_no_delay = 1;
2835         }
2836
2837         rc = ptlrpc_queue_wait(req);
2838         if (rc)
2839                 GOTO(out, rc);
2840
2841         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2842         if (msfs == NULL) {
2843                 GOTO(out, rc = -EPROTO);
2844         }
2845
2846         *osfs = *msfs;
2847
2848         EXIT;
2849  out:
2850         ptlrpc_req_finished(req);
2851         return rc;
2852 }
2853
2854 /* Retrieve object striping information.
2855  *
2856  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2857  * the maximum number of OST indices which will fit in the user buffer.
2858  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2859  */
2860 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2861 {
2862         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2863         struct lov_user_md_v3 lum, *lumk;
2864         struct lov_user_ost_data_v1 *lmm_objects;
2865         int rc = 0, lum_size;
2866         ENTRY;
2867
2868         if (!lsm)
2869                 RETURN(-ENODATA);
2870
2871         /* we only need the header part from user space to get lmm_magic and
2872          * lmm_stripe_count, (the header part is common to v1 and v3) */
2873         lum_size = sizeof(struct lov_user_md_v1);
2874         if (cfs_copy_from_user(&lum, lump, lum_size))
2875                 RETURN(-EFAULT);
2876
2877         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2878             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2879                 RETURN(-EINVAL);
2880
2881         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2882         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2883         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2884         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2885
2886         /* we can use lov_mds_md_size() to compute lum_size
2887          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2888         if (lum.lmm_stripe_count > 0) {
2889                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2890                 OBD_ALLOC(lumk, lum_size);
2891                 if (!lumk)
2892                         RETURN(-ENOMEM);
2893
2894                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2895                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2896                 else
2897                         lmm_objects = &(lumk->lmm_objects[0]);
2898                 lmm_objects->l_object_id = lsm->lsm_object_id;
2899         } else {
2900                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2901                 lumk = &lum;
2902         }
2903
2904         lumk->lmm_object_id = lsm->lsm_object_id;
2905         lumk->lmm_object_seq = lsm->lsm_object_seq;
2906         lumk->lmm_stripe_count = 1;
2907
2908         if (cfs_copy_to_user(lump, lumk, lum_size))
2909                 rc = -EFAULT;
2910
2911         if (lumk != &lum)
2912                 OBD_FREE(lumk, lum_size);
2913
2914         RETURN(rc);
2915 }
2916
2917
2918 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2919                          void *karg, void *uarg)
2920 {
2921         struct obd_device *obd = exp->exp_obd;
2922         struct obd_ioctl_data *data = karg;
2923         int err = 0;
2924         ENTRY;
2925
2926         if (!cfs_try_module_get(THIS_MODULE)) {
2927                 CERROR("Can't get module. Is it alive?");
2928                 return -EINVAL;
2929         }
2930         switch (cmd) {
2931         case OBD_IOC_LOV_GET_CONFIG: {
2932                 char *buf;
2933                 struct lov_desc *desc;
2934                 struct obd_uuid uuid;
2935
2936                 buf = NULL;
2937                 len = 0;
2938                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2939                         GOTO(out, err = -EINVAL);
2940
2941                 data = (struct obd_ioctl_data *)buf;
2942
2943                 if (sizeof(*desc) > data->ioc_inllen1) {
2944                         obd_ioctl_freedata(buf, len);
2945                         GOTO(out, err = -EINVAL);
2946                 }
2947
2948                 if (data->ioc_inllen2 < sizeof(uuid)) {
2949                         obd_ioctl_freedata(buf, len);
2950                         GOTO(out, err = -EINVAL);
2951                 }
2952
2953                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2954                 desc->ld_tgt_count = 1;
2955                 desc->ld_active_tgt_count = 1;
2956                 desc->ld_default_stripe_count = 1;
2957                 desc->ld_default_stripe_size = 0;
2958                 desc->ld_default_stripe_offset = 0;
2959                 desc->ld_pattern = 0;
2960                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2961
2962                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2963
2964                 err = cfs_copy_to_user((void *)uarg, buf, len);
2965                 if (err)
2966                         err = -EFAULT;
2967                 obd_ioctl_freedata(buf, len);
2968                 GOTO(out, err);
2969         }
2970         case LL_IOC_LOV_SETSTRIPE:
2971                 err = obd_alloc_memmd(exp, karg);
2972                 if (err > 0)
2973                         err = 0;
2974                 GOTO(out, err);
2975         case LL_IOC_LOV_GETSTRIPE:
2976                 err = osc_getstripe(karg, uarg);
2977                 GOTO(out, err);
2978         case OBD_IOC_CLIENT_RECOVER:
2979                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2980                                             data->ioc_inlbuf1, 0);
2981                 if (err > 0)
2982                         err = 0;
2983                 GOTO(out, err);
2984         case IOC_OSC_SET_ACTIVE:
2985                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2986                                                data->ioc_offset);
2987                 GOTO(out, err);
2988         case OBD_IOC_POLL_QUOTACHECK:
2989                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2990                 GOTO(out, err);
2991         case OBD_IOC_PING_TARGET:
2992                 err = ptlrpc_obd_ping(obd);
2993                 GOTO(out, err);
2994         default:
2995                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2996                        cmd, cfs_curproc_comm());
2997                 GOTO(out, err = -ENOTTY);
2998         }
2999 out:
3000         cfs_module_put(THIS_MODULE);
3001         return err;
3002 }
3003
3004 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3005                         obd_count keylen, void *key, __u32 *vallen, void *val,
3006                         struct lov_stripe_md *lsm)
3007 {
3008         ENTRY;
3009         if (!vallen || !val)
3010                 RETURN(-EFAULT);
3011
3012         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3013                 __u32 *stripe = val;
3014                 *vallen = sizeof(*stripe);
3015                 *stripe = 0;
3016                 RETURN(0);
3017         } else if (KEY_IS(KEY_LAST_ID)) {
3018                 struct ptlrpc_request *req;
3019                 obd_id                *reply;
3020                 char                  *tmp;
3021                 int                    rc;
3022
3023                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3024                                            &RQF_OST_GET_INFO_LAST_ID);
3025                 if (req == NULL)
3026                         RETURN(-ENOMEM);
3027
3028                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3029                                      RCL_CLIENT, keylen);
3030                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3031                 if (rc) {
3032                         ptlrpc_request_free(req);
3033                         RETURN(rc);
3034                 }
3035
3036                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3037                 memcpy(tmp, key, keylen);
3038
3039                 req->rq_no_delay = req->rq_no_resend = 1;
3040                 ptlrpc_request_set_replen(req);
3041                 rc = ptlrpc_queue_wait(req);
3042                 if (rc)
3043                         GOTO(out, rc);
3044
3045                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3046                 if (reply == NULL)
3047                         GOTO(out, rc = -EPROTO);
3048
3049                 *((obd_id *)val) = *reply;
3050         out:
3051                 ptlrpc_req_finished(req);
3052                 RETURN(rc);
3053         } else if (KEY_IS(KEY_FIEMAP)) {
3054                 struct ptlrpc_request *req;
3055                 struct ll_user_fiemap *reply;
3056                 char *tmp;
3057                 int rc;
3058
3059                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3060                                            &RQF_OST_GET_INFO_FIEMAP);
3061                 if (req == NULL)
3062                         RETURN(-ENOMEM);
3063
3064                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3065                                      RCL_CLIENT, keylen);
3066                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3067                                      RCL_CLIENT, *vallen);
3068                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3069                                      RCL_SERVER, *vallen);
3070
3071                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3072                 if (rc) {
3073                         ptlrpc_request_free(req);
3074                         RETURN(rc);
3075                 }
3076
3077                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3078                 memcpy(tmp, key, keylen);
3079                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3080                 memcpy(tmp, val, *vallen);
3081
3082                 ptlrpc_request_set_replen(req);
3083                 rc = ptlrpc_queue_wait(req);
3084                 if (rc)
3085                         GOTO(out1, rc);
3086
3087                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3088                 if (reply == NULL)
3089                         GOTO(out1, rc = -EPROTO);
3090
3091                 memcpy(val, reply, *vallen);
3092         out1:
3093                 ptlrpc_req_finished(req);
3094
3095                 RETURN(rc);
3096         }
3097
3098         RETURN(-EINVAL);
3099 }
3100
3101 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3102                               obd_count keylen, void *key, obd_count vallen,
3103                               void *val, struct ptlrpc_request_set *set)
3104 {
3105         struct ptlrpc_request *req;
3106         struct obd_device     *obd = exp->exp_obd;
3107         struct obd_import     *imp = class_exp2cliimp(exp);
3108         char                  *tmp;
3109         int                    rc;
3110         ENTRY;
3111
3112         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3113
3114         if (KEY_IS(KEY_CHECKSUM)) {
3115                 if (vallen != sizeof(int))
3116                         RETURN(-EINVAL);
3117                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3118                 RETURN(0);
3119         }
3120
3121         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3122                 sptlrpc_conf_client_adapt(obd);
3123                 RETURN(0);
3124         }
3125
3126         if (KEY_IS(KEY_FLUSH_CTX)) {
3127                 sptlrpc_import_flush_my_ctx(imp);
3128                 RETURN(0);
3129         }
3130
3131         if (KEY_IS(KEY_CACHE_SET)) {
3132                 struct client_obd *cli = &obd->u.cli;
3133
3134                 LASSERT(cli->cl_cache == NULL); /* only once */
3135                 cli->cl_cache = (struct cl_client_cache *)val;
3136                 cfs_atomic_inc(&cli->cl_cache->ccc_users);
3137                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3138
3139                 /* add this osc into entity list */
3140                 LASSERT(cfs_list_empty(&cli->cl_lru_osc));
3141                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3142                 cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3143                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3144
3145                 RETURN(0);
3146         }
3147
3148         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3149                 struct client_obd *cli = &obd->u.cli;
3150                 int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
3151                 int target = *(int *)val;
3152
3153                 nr = osc_lru_shrink(cli, min(nr, target));
3154                 *(int *)val -= nr;
3155                 RETURN(0);
3156         }
3157
3158         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3159                 RETURN(-EINVAL);
3160
3161         /* We pass all other commands directly to OST. Since nobody calls osc
3162            methods directly and everybody is supposed to go through LOV, we
3163            assume lov checked invalid values for us.
3164            The only recognised values so far are evict_by_nid and mds_conn.
3165            Even if something bad goes through, we'd get a -EINVAL from OST
3166            anyway. */
3167
3168         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3169                                                 &RQF_OST_SET_GRANT_INFO :
3170                                                 &RQF_OBD_SET_INFO);
3171         if (req == NULL)
3172                 RETURN(-ENOMEM);
3173
3174         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3175                              RCL_CLIENT, keylen);
3176         if (!KEY_IS(KEY_GRANT_SHRINK))
3177                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3178                                      RCL_CLIENT, vallen);
3179         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3180         if (rc) {
3181                 ptlrpc_request_free(req);
3182                 RETURN(rc);
3183         }
3184
3185         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3186         memcpy(tmp, key, keylen);
3187         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3188                                                         &RMF_OST_BODY :
3189                                                         &RMF_SETINFO_VAL);
3190         memcpy(tmp, val, vallen);
3191
3192         if (KEY_IS(KEY_GRANT_SHRINK)) {
3193                 struct osc_grant_args *aa;
3194                 struct obdo *oa;
3195
3196                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3197                 aa = ptlrpc_req_async_args(req);
3198                 OBDO_ALLOC(oa);
3199                 if (!oa) {
3200                         ptlrpc_req_finished(req);
3201                         RETURN(-ENOMEM);
3202                 }
3203                 *oa = ((struct ost_body *)val)->oa;
3204                 aa->aa_oa = oa;
3205                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3206         }
3207
3208         ptlrpc_request_set_replen(req);
3209         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3210                 LASSERT(set != NULL);
3211                 ptlrpc_set_add_req(set, req);
3212                 ptlrpc_check_set(NULL, set);
3213         } else
3214                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3215
3216         RETURN(0);
3217 }
3218
3219
3220 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3221                          struct obd_device *disk_obd, int *index)
3222 {
3223         /* this code is not supposed to be used with LOD/OSP
3224          * to be removed soon */
3225         LBUG();
3226         return 0;
3227 }
3228
3229 static int osc_llog_finish(struct obd_device *obd, int count)
3230 {
3231         struct llog_ctxt *ctxt;
3232
3233         ENTRY;
3234
3235         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3236         if (ctxt) {
3237                 llog_cat_close(NULL, ctxt->loc_handle);
3238                 llog_cleanup(NULL, ctxt);
3239         }
3240
3241         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3242         if (ctxt)
3243                 llog_cleanup(NULL, ctxt);
3244         RETURN(0);
3245 }
3246
3247 static int osc_reconnect(const struct lu_env *env,
3248                          struct obd_export *exp, struct obd_device *obd,
3249                          struct obd_uuid *cluuid,
3250                          struct obd_connect_data *data,
3251                          void *localdata)
3252 {
3253         struct client_obd *cli = &obd->u.cli;
3254
3255         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3256                 long lost_grant;
3257
3258                 client_obd_list_lock(&cli->cl_loi_list_lock);
3259                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3260                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3261                 lost_grant = cli->cl_lost_grant;
3262                 cli->cl_lost_grant = 0;
3263                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3264
3265                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3266                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3267                        data->ocd_version, data->ocd_grant, lost_grant);
3268         }
3269
3270         RETURN(0);
3271 }
3272
3273 static int osc_disconnect(struct obd_export *exp)
3274 {
3275         struct obd_device *obd = class_exp2obd(exp);
3276         struct llog_ctxt  *ctxt;
3277         int rc;
3278
3279         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3280         if (ctxt) {
3281                 if (obd->u.cli.cl_conn_count == 1) {
3282                         /* Flush any remaining cancel messages out to the
3283                          * target */
3284                         llog_sync(ctxt, exp, 0);
3285                 }
3286                 llog_ctxt_put(ctxt);
3287         } else {
3288                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3289                        obd);
3290         }
3291
3292         rc = client_disconnect_export(exp);
3293         /**
3294          * Initially we put del_shrink_grant before disconnect_export, but it
3295          * causes the following problem if setup (connect) and cleanup
3296          * (disconnect) are tangled together.
3297          *      connect p1                     disconnect p2
3298          *   ptlrpc_connect_import
3299          *     ...............               class_manual_cleanup
3300          *                                     osc_disconnect
3301          *                                     del_shrink_grant
3302          *   ptlrpc_connect_interrupt
3303          *     init_grant_shrink
3304          *   add this client to shrink list
3305          *                                      cleanup_osc
3306          * Bang! pinger trigger the shrink.
3307          * So the osc should be disconnected from the shrink list, after we
3308          * are sure the import has been destroyed. BUG18662
3309          */
3310         if (obd->u.cli.cl_import == NULL)
3311                 osc_del_shrink_grant(&obd->u.cli);
3312         return rc;
3313 }
3314
3315 static int osc_import_event(struct obd_device *obd,
3316                             struct obd_import *imp,
3317                             enum obd_import_event event)
3318 {
3319         struct client_obd *cli;
3320         int rc = 0;
3321
3322         ENTRY;
3323         LASSERT(imp->imp_obd == obd);
3324
3325         switch (event) {
3326         case IMP_EVENT_DISCON: {
3327                 cli = &obd->u.cli;
3328                 client_obd_list_lock(&cli->cl_loi_list_lock);
3329                 cli->cl_avail_grant = 0;
3330                 cli->cl_lost_grant = 0;
3331                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3332                 break;
3333         }
3334         case IMP_EVENT_INACTIVE: {
3335                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3336                 break;
3337         }
3338         case IMP_EVENT_INVALIDATE: {
3339                 struct ldlm_namespace *ns = obd->obd_namespace;
3340                 struct lu_env         *env;
3341                 int                    refcheck;
3342
3343                 env = cl_env_get(&refcheck);
3344                 if (!IS_ERR(env)) {
3345                         /* Reset grants */
3346                         cli = &obd->u.cli;
3347                         /* all pages go to failing rpcs due to the invalid
3348                          * import */
3349                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3350
3351                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3352                         cl_env_put(env, &refcheck);
3353                 } else
3354                         rc = PTR_ERR(env);
3355                 break;
3356         }
3357         case IMP_EVENT_ACTIVE: {
3358                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3359                 break;
3360         }
3361         case IMP_EVENT_OCD: {
3362                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3363
3364                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3365                         osc_init_grant(&obd->u.cli, ocd);
3366
3367                 /* See bug 7198 */
3368                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3369                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3370
3371                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3372                 break;
3373         }
3374         case IMP_EVENT_DEACTIVATE: {
3375                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3376                 break;
3377         }
3378         case IMP_EVENT_ACTIVATE: {
3379                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3380                 break;
3381         }
3382         default:
3383                 CERROR("Unknown import event %d\n", event);
3384                 LBUG();
3385         }
3386         RETURN(rc);
3387 }
3388
3389 /**
3390  * Determine whether the lock can be canceled before replaying the lock
3391  * during recovery, see bug16774 for detailed information.
3392  *
3393  * \retval zero the lock can't be canceled
3394  * \retval other ok to cancel
3395  */
3396 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3397 {
3398         check_res_locked(lock->l_resource);
3399
3400         /*
3401          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3402          *
3403          * XXX as a future improvement, we can also cancel unused write lock
3404          * if it doesn't have dirty data and active mmaps.
3405          */
3406         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3407             (lock->l_granted_mode == LCK_PR ||
3408              lock->l_granted_mode == LCK_CR) &&
3409             (osc_dlm_lock_pageref(lock) == 0))
3410                 RETURN(1);
3411
3412         RETURN(0);
3413 }
3414
3415 static int brw_queue_work(const struct lu_env *env, void *data)
3416 {
3417         struct client_obd *cli = data;
3418
3419         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3420
3421         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3422         RETURN(0);
3423 }
3424
3425 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3426 {
3427         struct lprocfs_static_vars lvars = { 0 };
3428         struct client_obd          *cli = &obd->u.cli;
3429         void                       *handler;
3430         int                        rc;
3431         ENTRY;
3432
3433         rc = ptlrpcd_addref();
3434         if (rc)
3435                 RETURN(rc);
3436
3437         rc = client_obd_setup(obd, lcfg);
3438         if (rc)
3439                 GOTO(out_ptlrpcd, rc);
3440
3441         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3442         if (IS_ERR(handler))
3443                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3444         cli->cl_writeback_work = handler;
3445
3446         rc = osc_quota_setup(obd);
3447         if (rc)
3448                 GOTO(out_ptlrpcd_work, rc);
3449
3450         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3451         lprocfs_osc_init_vars(&lvars);
3452         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3453                 lproc_osc_attach_seqstat(obd);
3454                 sptlrpc_lprocfs_cliobd_attach(obd);
3455                 ptlrpc_lprocfs_register_obd(obd);
3456         }
3457
3458         /* We need to allocate a few requests more, because
3459          * brw_interpret tries to create new requests before freeing
3460          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3461          * reserved, but I'm afraid that might be too much wasted RAM
3462          * in fact, so 2 is just my guess and still should work. */
3463         cli->cl_import->imp_rq_pool =
3464                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3465                                     OST_MAXREQSIZE,
3466                                     ptlrpc_add_rqs_to_pool);
3467
3468         CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3469         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3470         RETURN(rc);
3471
3472 out_ptlrpcd_work:
3473         ptlrpcd_destroy_work(handler);
3474 out_client_setup:
3475         client_obd_cleanup(obd);
3476 out_ptlrpcd:
3477         ptlrpcd_decref();
3478         RETURN(rc);
3479 }
3480
3481 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3482 {
3483         int rc = 0;
3484         ENTRY;
3485
3486         switch (stage) {
3487         case OBD_CLEANUP_EARLY: {
3488                 struct obd_import *imp;
3489                 imp = obd->u.cli.cl_import;
3490                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3491                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3492                 ptlrpc_deactivate_import(imp);
3493                 spin_lock(&imp->imp_lock);
3494                 imp->imp_pingable = 0;
3495                 spin_unlock(&imp->imp_lock);
3496                 break;
3497         }
3498         case OBD_CLEANUP_EXPORTS: {
3499                 struct client_obd *cli = &obd->u.cli;
3500                 /* LU-464
3501                  * for echo client, export may be on zombie list, wait for
3502                  * zombie thread to cull it, because cli.cl_import will be
3503                  * cleared in client_disconnect_export():
3504                  *   class_export_destroy() -> obd_cleanup() ->
3505                  *   echo_device_free() -> echo_client_cleanup() ->
3506                  *   obd_disconnect() -> osc_disconnect() ->
3507                  *   client_disconnect_export()
3508                  */
3509                 obd_zombie_barrier();
3510                 if (cli->cl_writeback_work) {
3511                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3512                         cli->cl_writeback_work = NULL;
3513                 }
3514                 obd_cleanup_client_import(obd);
3515                 ptlrpc_lprocfs_unregister_obd(obd);
3516                 lprocfs_obd_cleanup(obd);
3517                 rc = obd_llog_finish(obd, 0);
3518                 if (rc != 0)
3519                         CERROR("failed to cleanup llogging subsystems\n");
3520                 break;
3521                 }
3522         }
3523         RETURN(rc);
3524 }
3525
3526 int osc_cleanup(struct obd_device *obd)
3527 {
3528         struct client_obd *cli = &obd->u.cli;
3529         int rc;
3530
3531         ENTRY;
3532
3533         /* lru cleanup */
3534         if (cli->cl_cache != NULL) {
3535                 LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_users) > 0);
3536                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3537                 cfs_list_del_init(&cli->cl_lru_osc);
3538                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3539                 cli->cl_lru_left = NULL;
3540                 cfs_atomic_dec(&cli->cl_cache->ccc_users);
3541                 cli->cl_cache = NULL;
3542         }
3543
3544         /* free memory of osc quota cache */
3545         osc_quota_cleanup(obd);
3546
3547         rc = client_obd_cleanup(obd);
3548
3549         ptlrpcd_decref();
3550         RETURN(rc);
3551 }
3552
3553 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3554 {
3555         struct lprocfs_static_vars lvars = { 0 };
3556         int rc = 0;
3557
3558         lprocfs_osc_init_vars(&lvars);
3559
3560         switch (lcfg->lcfg_command) {
3561         default:
3562                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3563                                               lcfg, obd);
3564                 if (rc > 0)
3565                         rc = 0;
3566                 break;
3567         }
3568
3569         return(rc);
3570 }
3571
3572 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3573 {
3574         return osc_process_config_base(obd, buf);
3575 }
3576
3577 struct obd_ops osc_obd_ops = {
3578         .o_owner                = THIS_MODULE,
3579         .o_setup                = osc_setup,
3580         .o_precleanup           = osc_precleanup,
3581         .o_cleanup              = osc_cleanup,
3582         .o_add_conn             = client_import_add_conn,
3583         .o_del_conn             = client_import_del_conn,
3584         .o_connect              = client_connect_import,
3585         .o_reconnect            = osc_reconnect,
3586         .o_disconnect           = osc_disconnect,
3587         .o_statfs               = osc_statfs,
3588         .o_statfs_async         = osc_statfs_async,
3589         .o_packmd               = osc_packmd,
3590         .o_unpackmd             = osc_unpackmd,
3591         .o_create               = osc_create,
3592         .o_destroy              = osc_destroy,
3593         .o_getattr              = osc_getattr,
3594         .o_getattr_async        = osc_getattr_async,
3595         .o_setattr              = osc_setattr,
3596         .o_setattr_async        = osc_setattr_async,
3597         .o_brw                  = osc_brw,
3598         .o_punch                = osc_punch,
3599         .o_sync                 = osc_sync,
3600         .o_enqueue              = osc_enqueue,
3601         .o_change_cbdata        = osc_change_cbdata,
3602         .o_find_cbdata          = osc_find_cbdata,
3603         .o_cancel               = osc_cancel,
3604         .o_cancel_unused        = osc_cancel_unused,
3605         .o_iocontrol            = osc_iocontrol,
3606         .o_get_info             = osc_get_info,
3607         .o_set_info_async       = osc_set_info_async,
3608         .o_import_event         = osc_import_event,
3609         .o_llog_init            = osc_llog_init,
3610         .o_llog_finish          = osc_llog_finish,
3611         .o_process_config       = osc_process_config,
3612         .o_quotactl             = osc_quotactl,
3613         .o_quotacheck           = osc_quotacheck,
3614 };
3615
3616 extern struct lu_kmem_descr osc_caches[];
3617 extern spinlock_t osc_ast_guard;
3618 extern struct lock_class_key osc_ast_guard_class;
3619
3620 int __init osc_init(void)
3621 {
3622         struct lprocfs_static_vars lvars = { 0 };
3623         int rc;
3624         ENTRY;
3625
3626         /* print an address of _any_ initialized kernel symbol from this
3627          * module, to allow debugging with gdb that doesn't support data
3628          * symbols from modules.*/
3629         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3630
3631         rc = lu_kmem_init(osc_caches);
3632
3633         lprocfs_osc_init_vars(&lvars);
3634
3635         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3636                                  LUSTRE_OSC_NAME, &osc_device_type);
3637         if (rc) {
3638                 lu_kmem_fini(osc_caches);
3639                 RETURN(rc);
3640         }
3641
3642         spin_lock_init(&osc_ast_guard);
3643         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3644
3645         RETURN(rc);
3646 }
3647
3648 #ifdef __KERNEL__
3649 static void /*__exit*/ osc_exit(void)
3650 {
3651         class_unregister_type(LUSTRE_OSC_NAME);
3652         lu_kmem_fini(osc_caches);
3653 }
3654
3655 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3656 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3657 MODULE_LICENSE("GPL");
3658
3659 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3660 #endif