Whamcloud - gitweb
81be2f184e117ba2c56a5924a431ece1c3da7e3a
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ost/ost_handler.c
37  *
38  * Author: Peter J. Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_OST
43
44 #include <linux/module.h>
45 #include <obd_cksum.h>
46 #include <obd_ost.h>
47 #include <lustre_net.h>
48 #include <lustre_dlm.h>
49 #include <lustre_export.h>
50 #include <lustre_debug.h>
51 #include <lustre_fid.h>
52 #include <lustre_fld.h>
53 #include <linux/init.h>
54 #include <lprocfs_status.h>
55 #include <libcfs/list.h>
56 #include "ost_internal.h"
57 #include <lustre_fid.h>
58
59 static int oss_num_threads;
60 CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
61                 "number of OSS service threads to start");
62
63 static int ost_num_threads;
64 CFS_MODULE_PARM(ost_num_threads, "i", int, 0444,
65                 "number of OST service threads to start (deprecated)");
66
67 static int oss_num_create_threads;
68 CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
69                 "number of OSS create threads to start");
70
71 static char *oss_cpts;
72 CFS_MODULE_PARM(oss_cpts, "s", charp, 0444,
73                 "CPU partitions OSS threads should run on");
74
75 static char *oss_io_cpts;
76 CFS_MODULE_PARM(oss_io_cpts, "s", charp, 0444,
77                 "CPU partitions OSS IO threads should run on");
78
79 /*
80  * this page is allocated statically when module is initializing
81  * it is used to simulate data corruptions, see ost_checksum_bulk()
82  * for details. as the original pages provided by the layers below
83  * can be remain in the internal cache, we do not want to modify
84  * them.
85  */
86 static struct page *ost_page_to_corrupt = NULL;
87
88 /**
89  * Do not return server-side uid/gid to remote client
90  */
91 static void ost_drop_id(struct obd_export *exp, struct obdo *oa)
92 {
93         if (exp_connect_rmtclient(exp)) {
94                 oa->o_uid = -1;
95                 oa->o_gid = -1;
96                 oa->o_valid &= ~(OBD_MD_FLUID | OBD_MD_FLGID);
97         }
98 }
99
100 /**
101  * Validate oa from client.
102  * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
103  * req are valid.
104  *    a. for single MDS  seq = FID_SEQ_OST_MDT0,
105  *    b. for CMD, seq = FID_SEQ_OST_MDT0, FID_SEQ_OST_MDT1 - FID_SEQ_OST_MAX
106  */
107 static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
108                              struct obd_ioobj *ioobj)
109 {
110         if (oa != NULL && !(oa->o_valid & OBD_MD_FLGROUP)) {
111                 oa->o_seq = FID_SEQ_OST_MDT0;
112                 if (ioobj)
113                         ioobj->ioo_seq = FID_SEQ_OST_MDT0;
114         /* remove fid_seq_is_rsvd() after FID-on-OST allows SEQ > 9 */
115         } else if (oa == NULL ||
116                    !(fid_seq_is_norm(oa->o_seq) || fid_seq_is_mdt(oa->o_seq) ||
117                      fid_seq_is_echo(oa->o_seq))) {
118                 CERROR("%s: client %s sent invalid object "POSTID"\n",
119                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
120                        oa ? oa->o_id : -1, oa ? oa->o_seq : -1);
121                 return -EPROTO;
122         }
123         obdo_from_ostid(oa, &oa->o_oi);
124         if (ioobj)
125                 ioobj_from_obdo(ioobj, oa);
126         return 0;
127 }
128
129 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
130 {
131         struct oti_req_ack_lock *ack_lock;
132         int i;
133
134         if (oti == NULL)
135                 return;
136
137         if (req->rq_repmsg) {
138                 __u64 versions[PTLRPC_NUM_VERSIONS] = { 0 };
139                 lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
140                 versions[0] = oti->oti_pre_version;
141                 lustre_msg_set_versions(req->rq_repmsg, versions);
142         }
143         req->rq_transno = oti->oti_transno;
144
145         /* XXX 4 == entries in oti_ack_locks??? */
146         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
147                 if (!ack_lock->mode)
148                         break;
149                 /* XXX not even calling target_send_reply in some cases... */
150                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode, 0);
151         }
152 }
153
154 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
155                        struct obd_trans_info *oti)
156 {
157         struct ost_body *body, *repbody;
158         struct lustre_capa *capa = NULL;
159         int rc;
160         ENTRY;
161
162         /* Get the request body */
163         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
164         if (body == NULL)
165                 RETURN(-EFAULT);
166
167         if (body->oa.o_id == 0)
168                 RETURN(-EPROTO);
169
170         rc = ost_validate_obdo(exp, &body->oa, NULL);
171         if (rc)
172                 RETURN(rc);
173
174         /* If there's a DLM request, cancel the locks mentioned in it*/
175         if (req_capsule_field_present(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT)) {
176                 struct ldlm_request *dlm;
177
178                 dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
179                 if (dlm == NULL)
180                         RETURN (-EFAULT);
181                 ldlm_request_cancel(req, dlm, 0);
182         }
183
184         /* If there's a capability, get it */
185         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
186                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
187                 if (capa == NULL) {
188                         CERROR("Missing capability for OST DESTROY");
189                         RETURN (-EFAULT);
190                 }
191         }
192
193         /* Prepare the reply */
194         rc = req_capsule_server_pack(&req->rq_pill);
195         if (rc)
196                 RETURN(rc);
197
198         /* Get the log cancellation cookie */
199         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
200                 oti->oti_logcookies = &body->oa.o_lcookie;
201
202         /* Finish the reply */
203         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
204         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
205
206         /* Do the destroy and set the reply status accordingly  */
207         req->rq_status = obd_destroy(req->rq_svc_thread->t_env, exp,
208                                      &repbody->oa, NULL, oti, NULL, capa);
209         RETURN(0);
210 }
211
212 /**
213  * Helper function for getting server side [start, start+count] DLM lock
214  * if asked by client.
215  */
216 static int ost_lock_get(struct obd_export *exp, struct obdo *oa,
217                         __u64 start, __u64 count, struct lustre_handle *lh,
218                         int mode, __u64 flags)
219 {
220         struct ldlm_res_id res_id;
221         ldlm_policy_data_t policy;
222         __u64 end = start + count;
223
224         ENTRY;
225
226         LASSERT(!lustre_handle_is_used(lh));
227         /* o_id and o_gr are used for localizing resource, if client miss to set
228          * them, do not trigger ASSERTION. */
229         if (unlikely((oa->o_valid & (OBD_MD_FLID | OBD_MD_FLGROUP)) !=
230                      (OBD_MD_FLID | OBD_MD_FLGROUP)))
231                 RETURN(-EPROTO);
232
233         if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
234             !(oa->o_flags & OBD_FL_SRVLOCK))
235                 RETURN(0);
236
237         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
238         CDEBUG(D_INODE, "OST-side extent lock.\n");
239
240         policy.l_extent.start = start & CFS_PAGE_MASK;
241
242         /* If ->o_blocks is EOF it means "lock till the end of the
243          * file". Otherwise, it's size of a hole being punched (in bytes) */
244         if (count == OBD_OBJECT_EOF || end < start)
245                 policy.l_extent.end = OBD_OBJECT_EOF;
246         else
247                 policy.l_extent.end = end | ~CFS_PAGE_MASK;
248
249         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
250                                       LDLM_EXTENT, &policy, mode, &flags,
251                                       ldlm_blocking_ast, ldlm_completion_ast,
252                                       ldlm_glimpse_ast, NULL, 0, LVB_T_NONE,
253                                       NULL, lh));
254 }
255
256 /* Helper function: release lock, if any. */
257 static void ost_lock_put(struct obd_export *exp,
258                          struct lustre_handle *lh, int mode)
259 {
260         ENTRY;
261         if (lustre_handle_is_used(lh))
262                 ldlm_lock_decref(lh, mode);
263         EXIT;
264 }
265
266 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
267 {
268         struct ost_body *body, *repbody;
269         struct obd_info *oinfo;
270         struct lustre_handle lh = { 0 };
271         struct lustre_capa *capa = NULL;
272         int rc;
273         ENTRY;
274
275         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
276         if (body == NULL)
277                 RETURN(-EFAULT);
278
279         rc = ost_validate_obdo(exp, &body->oa, NULL);
280         if (rc)
281                 RETURN(rc);
282
283         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
284                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
285                 if (capa == NULL) {
286                         CERROR("Missing capability for OST GETATTR");
287                         RETURN(-EFAULT);
288                 }
289         }
290
291         rc = req_capsule_server_pack(&req->rq_pill);
292         if (rc)
293                 RETURN(rc);
294
295         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
296         repbody->oa = body->oa;
297
298         rc = ost_lock_get(exp, &repbody->oa, 0, OBD_OBJECT_EOF, &lh, LCK_PR, 0);
299         if (rc)
300                 RETURN(rc);
301
302         OBD_ALLOC_PTR(oinfo);
303         if (!oinfo)
304                 GOTO(unlock, rc = -ENOMEM);
305         oinfo->oi_oa = &repbody->oa;
306         oinfo->oi_capa = capa;
307
308         req->rq_status = obd_getattr(req->rq_svc_thread->t_env, exp, oinfo);
309
310         OBD_FREE_PTR(oinfo);
311
312         ost_drop_id(exp, &repbody->oa);
313
314 unlock:
315         ost_lock_put(exp, &lh, LCK_PR);
316         RETURN(rc);
317 }
318
319 static int ost_statfs(struct ptlrpc_request *req)
320 {
321         struct obd_statfs *osfs;
322         int rc;
323         ENTRY;
324
325         rc = req_capsule_server_pack(&req->rq_pill);
326         if (rc)
327                 RETURN(rc);
328
329         osfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
330
331         req->rq_status = obd_statfs(req->rq_svc_thread->t_env, req->rq_export,
332                                     osfs,
333                                     cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
334                                     0);
335         if (req->rq_status != 0)
336                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
337
338         if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_EINPROGRESS))
339                 req->rq_status = -EINPROGRESS;
340
341         RETURN(0);
342 }
343
344 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
345                       struct obd_trans_info *oti)
346 {
347         struct ost_body *body, *repbody;
348         int rc;
349         ENTRY;
350
351         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
352         if (body == NULL)
353                 RETURN(-EFAULT);
354
355         rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
356         if (rc)
357                 RETURN(rc);
358
359         rc = req_capsule_server_pack(&req->rq_pill);
360         if (rc)
361                 RETURN(rc);
362
363         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
364         repbody->oa = body->oa;
365         oti->oti_logcookies = &body->oa.o_lcookie;
366
367         req->rq_status = obd_create(req->rq_svc_thread->t_env, exp,
368                                     &repbody->oa, NULL, oti);
369         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
370         RETURN(0);
371 }
372
373 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
374                      struct obd_trans_info *oti)
375 {
376         struct ost_body *body, *repbody;
377         __u64 flags = 0;
378         struct lustre_handle lh = {0,};
379         int rc;
380         ENTRY;
381
382         /* check that we do support OBD_CONNECT_TRUNCLOCK. */
383         CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
384
385         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
386         if (body == NULL)
387                 RETURN(-EFAULT);
388
389         rc = ost_validate_obdo(exp, &body->oa, NULL);
390         if (rc)
391                 RETURN(rc);
392
393         if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
394             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
395                 RETURN(-EPROTO);
396
397         rc = req_capsule_server_pack(&req->rq_pill);
398         if (rc)
399                 RETURN(rc);
400
401         /* standard truncate optimization: if file body is completely
402          * destroyed, don't send data back to the server. */
403         if (body->oa.o_size == 0)
404                 flags |= LDLM_AST_DISCARD_DATA;
405
406         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
407         repbody->oa = body->oa;
408
409         rc = ost_lock_get(exp, &repbody->oa, repbody->oa.o_size,
410                           repbody->oa.o_blocks, &lh, LCK_PW, flags);
411         if (rc == 0) {
412                 struct obd_info *oinfo;
413                 struct lustre_capa *capa = NULL;
414
415                 if (repbody->oa.o_valid & OBD_MD_FLFLAGS &&
416                     repbody->oa.o_flags == OBD_FL_SRVLOCK)
417                         /*
418                          * If OBD_FL_SRVLOCK is the only bit set in
419                          * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
420                          * through filter_setattr() to filter_iocontrol().
421                          */
422                         repbody->oa.o_valid &= ~OBD_MD_FLFLAGS;
423
424                 if (repbody->oa.o_valid & OBD_MD_FLOSSCAPA) {
425                         capa = req_capsule_client_get(&req->rq_pill,
426                                                       &RMF_CAPA1);
427                         if (capa == NULL) {
428                                 CERROR("Missing capability for OST PUNCH");
429                                 GOTO(unlock, rc = -EFAULT);
430                         }
431                 }
432
433                 OBD_ALLOC_PTR(oinfo);
434                 if (!oinfo)
435                         GOTO(unlock, rc = -ENOMEM);
436                 oinfo->oi_oa = &repbody->oa;
437                 oinfo->oi_policy.l_extent.start = oinfo->oi_oa->o_size;
438                 oinfo->oi_policy.l_extent.end = oinfo->oi_oa->o_blocks;
439                 oinfo->oi_capa = capa;
440                 oinfo->oi_flags = OBD_FL_PUNCH;
441
442                 req->rq_status = obd_punch(req->rq_svc_thread->t_env, exp,
443                                            oinfo, oti, NULL);
444                 OBD_FREE_PTR(oinfo);
445 unlock:
446                 ost_lock_put(exp, &lh, LCK_PW);
447         }
448
449         ost_drop_id(exp, &repbody->oa);
450         RETURN(rc);
451 }
452
453 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req,
454                     struct obd_trans_info *oti)
455 {
456         struct ost_body *body, *repbody;
457         struct obd_info *oinfo;
458         struct lustre_capa *capa = NULL;
459         int rc;
460         ENTRY;
461
462         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
463         if (body == NULL)
464                 RETURN(-EFAULT);
465
466         rc = ost_validate_obdo(exp, &body->oa, NULL);
467         if (rc)
468                 RETURN(rc);
469
470         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
471                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
472                 if (capa == NULL) {
473                         CERROR("Missing capability for OST SYNC");
474                         RETURN (-EFAULT);
475                 }
476         }
477
478         rc = req_capsule_server_pack(&req->rq_pill);
479         if (rc)
480                 RETURN(rc);
481
482         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
483         repbody->oa = body->oa;
484
485         OBD_ALLOC_PTR(oinfo);
486         if (!oinfo)
487                 RETURN(-ENOMEM);
488
489         oinfo->oi_oa = &repbody->oa;
490         oinfo->oi_capa = capa;
491         oinfo->oi_jobid = oti->oti_jobid;
492         req->rq_status = obd_sync(req->rq_svc_thread->t_env, exp, oinfo,
493                                   repbody->oa.o_size, repbody->oa.o_blocks,
494                                   NULL);
495         OBD_FREE_PTR(oinfo);
496
497         ost_drop_id(exp, &repbody->oa);
498         RETURN(0);
499 }
500
501 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
502                        struct obd_trans_info *oti)
503 {
504         struct ost_body *body, *repbody;
505         struct obd_info *oinfo;
506         struct lustre_capa *capa = NULL;
507         int rc;
508         ENTRY;
509
510         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
511         if (body == NULL)
512                 RETURN(-EFAULT);
513
514         rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
515         if (rc)
516                 RETURN(rc);
517
518         rc = req_capsule_server_pack(&req->rq_pill);
519         if (rc)
520                 RETURN(rc);
521
522         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
523                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
524                 if (capa == NULL) {
525                         CERROR("Missing capability for OST SETATTR");
526                         RETURN (-EFAULT);
527                 }
528         }
529
530         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
531         repbody->oa = body->oa;
532
533         OBD_ALLOC_PTR(oinfo);
534         if (!oinfo)
535                 RETURN(-ENOMEM);
536         oinfo->oi_oa = &repbody->oa;
537         oinfo->oi_capa = capa;
538
539         req->rq_status = obd_setattr(req->rq_svc_thread->t_env, exp, oinfo,
540                                      oti);
541
542         OBD_FREE_PTR(oinfo);
543
544         ost_drop_id(exp, &repbody->oa);
545         RETURN(0);
546 }
547
548 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
549                                cksum_type_t cksum_type)
550 {
551         struct cfs_crypto_hash_desc     *hdesc;
552         unsigned int                    bufsize;
553         int                             i, err;
554         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
555         __u32                           cksum;
556
557         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
558         if (IS_ERR(hdesc)) {
559                 CERROR("Unable to initialize checksum hash %s\n",
560                        cfs_crypto_hash_name(cfs_alg));
561                 return PTR_ERR(hdesc);
562         }
563         CDEBUG(D_INFO, "Checksum for algo %s\n", cfs_crypto_hash_name(cfs_alg));
564         for (i = 0; i < desc->bd_iov_count; i++) {
565
566                 /* corrupt the data before we compute the checksum, to
567                  * simulate a client->OST data error */
568                 if (i == 0 && opc == OST_WRITE &&
569                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE)) {
570                         int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
571                         int len = desc->bd_iov[i].kiov_len;
572                         struct page *np = ost_page_to_corrupt;
573                         char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
574
575                         if (np) {
576                                 char *ptr2 = kmap(np) + off;
577
578                                 memcpy(ptr2, ptr, len);
579                                 memcpy(ptr2, "bad3", min(4, len));
580                                 kunmap(np);
581                                 desc->bd_iov[i].kiov_page = np;
582                         } else {
583                                 CERROR("can't alloc page for corruption\n");
584                         }
585                 }
586                 cfs_crypto_hash_update_page(hdesc, desc->bd_iov[i].kiov_page,
587                                   desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK,
588                                   desc->bd_iov[i].kiov_len);
589
590                  /* corrupt the data after we compute the checksum, to
591                  * simulate an OST->client data error */
592                 if (i == 0 && opc == OST_READ &&
593                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
594                         int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
595                         int len = desc->bd_iov[i].kiov_len;
596                         struct page *np = ost_page_to_corrupt;
597                         char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
598
599                         if (np) {
600                                 char *ptr2 = kmap(np) + off;
601
602                                 memcpy(ptr2, ptr, len);
603                                 memcpy(ptr2, "bad4", min(4, len));
604                                 kunmap(np);
605                                 desc->bd_iov[i].kiov_page = np;
606                         } else {
607                                 CERROR("can't alloc page for corruption\n");
608                         }
609                 }
610         }
611
612         bufsize = 4;
613         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
614         if (err)
615                 cfs_crypto_hash_final(hdesc, NULL, NULL);
616
617         return cksum;
618 }
619
620 static int ost_brw_lock_get(int mode, struct obd_export *exp,
621                             struct obd_ioobj *obj, struct niobuf_remote *nb,
622                             struct lustre_handle *lh)
623 {
624         __u64 flags               = 0;
625         int nrbufs                = obj->ioo_bufcnt;
626         struct ldlm_res_id res_id;
627         ldlm_policy_data_t policy;
628         int i;
629         ENTRY;
630
631         osc_build_res_name(obj->ioo_id, obj->ioo_seq, &res_id);
632         LASSERT(mode == LCK_PR || mode == LCK_PW);
633         LASSERT(!lustre_handle_is_used(lh));
634
635         if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
636                 RETURN(0);
637
638         for (i = 1; i < nrbufs; i ++)
639                 if ((nb[0].flags & OBD_BRW_SRVLOCK) !=
640                     (nb[i].flags & OBD_BRW_SRVLOCK))
641                         RETURN(-EFAULT);
642
643         policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
644         policy.l_extent.end   = (nb[nrbufs - 1].offset +
645                                  nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
646
647         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
648                                       LDLM_EXTENT, &policy, mode, &flags,
649                                       ldlm_blocking_ast, ldlm_completion_ast,
650                                       ldlm_glimpse_ast, NULL, 0, LVB_T_NONE,
651                                       NULL, lh));
652 }
653
654 static void ost_brw_lock_put(int mode,
655                              struct obd_ioobj *obj, struct niobuf_remote *niob,
656                              struct lustre_handle *lh)
657 {
658         ENTRY;
659         LASSERT(mode == LCK_PR || mode == LCK_PW);
660         LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
661                 lustre_handle_is_used(lh));
662         if (lustre_handle_is_used(lh))
663                 ldlm_lock_decref(lh, mode);
664         EXIT;
665 }
666
667 /* Allocate thread local buffers if needed */
668 static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r)
669 {
670         struct ost_thread_local_cache *tls =
671                 (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
672
673         /* In normal mode of operation an I/O request is serviced only
674          * by ll_ost_io threads each of them has own tls buffers allocated by
675          * ost_io_thread_init().
676          * During recovery, an I/O request may be queued until any of the ost
677          * service threads process it. Not necessary it should be one of
678          * ll_ost_io threads. In that case we dynamically allocating tls
679          * buffers for the request service time. */
680         if (unlikely(tls == NULL)) {
681                 LASSERT(r->rq_export->exp_in_recovery);
682                 OBD_ALLOC_PTR(tls);
683                 if (tls != NULL) {
684                         tls->temporary = 1;
685                         r->rq_svc_thread->t_data = tls;
686                 }
687         }
688         return  tls;
689 }
690
691 /* Free thread local buffers if they were allocated only for servicing
692  * this one request */
693 static void ost_tls_put(struct ptlrpc_request *r)
694 {
695         struct ost_thread_local_cache *tls =
696                 (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
697
698         if (unlikely(tls->temporary)) {
699                 OBD_FREE_PTR(tls);
700                 r->rq_svc_thread->t_data = NULL;
701         }
702 }
703
704 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
705 {
706         struct ptlrpc_bulk_desc *desc = NULL;
707         struct obd_export *exp = req->rq_export;
708         struct niobuf_remote *remote_nb;
709         struct niobuf_local *local_nb;
710         struct obd_ioobj *ioo;
711         struct ost_body *body, *repbody;
712         struct lustre_capa *capa = NULL;
713         struct l_wait_info lwi;
714         struct lustre_handle lockh = { 0 };
715         int niocount, npages, nob = 0, rc, i;
716         int no_reply = 0;
717         struct ost_thread_local_cache *tls;
718         ENTRY;
719
720         req->rq_bulk_read = 1;
721
722         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
723                 GOTO(out, rc = -EIO);
724
725         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
726
727         /* Check if there is eviction in progress, and if so, wait for it to
728          * finish */
729         if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
730                 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
731                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
732                         !cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress),
733                         &lwi);
734         }
735         if (exp->exp_failed)
736                 GOTO(out, rc = -ENOTCONN);
737
738         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
739          * ost_rw_hpreq_check(). */
740         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
741         if (body == NULL)
742                 GOTO(out, rc = -EFAULT);
743
744         /*
745          * A req_capsule_X_get_array(pill, field, ptr_to_element_count) function
746          * would be useful here and wherever we get &RMF_OBD_IOOBJ and
747          * &RMF_NIOBUF_REMOTE.
748          */
749         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
750         if (ioo == NULL)
751                 GOTO(out, rc = -EFAULT);
752
753         rc = ost_validate_obdo(exp, &body->oa, ioo);
754         if (rc)
755                 RETURN(rc);
756
757         niocount = ioo->ioo_bufcnt;
758         remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
759         if (remote_nb == NULL)
760                 GOTO(out, rc = -EFAULT);
761
762         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
763                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
764                 if (capa == NULL) {
765                         CERROR("Missing capability for OST BRW READ");
766                         GOTO(out, rc = -EFAULT);
767                 }
768         }
769
770         rc = req_capsule_server_pack(&req->rq_pill);
771         if (rc)
772                 GOTO(out, rc);
773
774         tls = ost_tls_get(req);
775         if (tls == NULL)
776                 GOTO(out_bulk, rc = -ENOMEM);
777         local_nb = tls->local;
778
779         rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
780         if (rc != 0)
781                 GOTO(out_tls, rc);
782
783         /*
784          * If getting the lock took more time than
785          * client was willing to wait, drop it. b=11330
786          */
787         if (cfs_time_current_sec() > req->rq_deadline ||
788             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
789                 no_reply = 1;
790                 CERROR("Dropping timed-out read from %s because locking"
791                        "object "LPX64" took %ld seconds (limit was %ld).\n",
792                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
793                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
794                        req->rq_deadline - req->rq_arrival_time.tv_sec);
795                 GOTO(out_lock, rc = -ETIMEDOUT);
796         }
797
798         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
799         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
800
801         npages = OST_THREAD_POOL_SIZE;
802         rc = obd_preprw(req->rq_svc_thread->t_env, OBD_BRW_READ, exp,
803                         &repbody->oa, 1, ioo, remote_nb, &npages, local_nb,
804                         oti, capa);
805         if (rc != 0)
806                 GOTO(out_lock, rc);
807
808         desc = ptlrpc_prep_bulk_exp(req, npages,
809                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
810         if (desc == NULL)
811                 GOTO(out_commitrw, rc = -ENOMEM);
812
813         nob = 0;
814         for (i = 0; i < npages; i++) {
815                 int page_rc = local_nb[i].rc;
816
817                 if (page_rc < 0) {              /* error */
818                         rc = page_rc;
819                         break;
820                 }
821
822                 nob += page_rc;
823                 if (page_rc != 0) {             /* some data! */
824                         LASSERT (local_nb[i].page != NULL);
825                         ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
826                                                     local_nb[i].lnb_page_offset,
827                                                     page_rc);
828                 }
829
830                 if (page_rc != local_nb[i].len) { /* short read */
831                         /* All subsequent pages should be 0 */
832                         while(++i < npages)
833                                 LASSERT(local_nb[i].rc == 0);
834                         break;
835                 }
836         }
837
838         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
839                 cksum_type_t cksum_type =
840                         cksum_type_unpack(repbody->oa.o_valid & OBD_MD_FLFLAGS ?
841                                           repbody->oa.o_flags : 0);
842                 repbody->oa.o_flags = cksum_type_pack(cksum_type);
843                 repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
844                 repbody->oa.o_cksum = ost_checksum_bulk(desc, OST_READ,cksum_type);
845                 CDEBUG(D_PAGE, "checksum at read origin: %x\n",
846                        repbody->oa.o_cksum);
847         } else {
848                 repbody->oa.o_valid = 0;
849         }
850         /* We're finishing using body->oa as an input variable */
851
852         /* Check if client was evicted while we were doing i/o before touching
853            network */
854         if (rc == 0) {
855                 if (likely(!CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2)))
856                         rc = target_bulk_io(exp, desc, &lwi);
857                 no_reply = rc != 0;
858         }
859
860 out_commitrw:
861         /* Must commit after prep above in all cases */
862         rc = obd_commitrw(req->rq_svc_thread->t_env, OBD_BRW_READ, exp,
863                           &repbody->oa, 1, ioo, remote_nb, npages, local_nb,
864                           oti, rc);
865
866         if (rc == 0)
867                 ost_drop_id(exp, &repbody->oa);
868
869 out_lock:
870         ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
871 out_tls:
872         ost_tls_put(req);
873 out_bulk:
874         if (desc && !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))
875                 ptlrpc_free_bulk_nopin(desc);
876 out:
877         LASSERT(rc <= 0);
878         if (rc == 0) {
879                 req->rq_status = nob;
880                 ptlrpc_lprocfs_brw(req, nob);
881                 target_committed_to_req(req);
882                 ptlrpc_reply(req);
883         } else if (!no_reply) {
884                 /* Only reply if there was no comms problem with bulk */
885                 target_committed_to_req(req);
886                 req->rq_status = rc;
887                 ptlrpc_error(req);
888         } else {
889                 /* reply out callback would free */
890                 ptlrpc_req_drop_rs(req);
891                 LCONSOLE_WARN("%s: Bulk IO read error with %s (at %s), "
892                               "client will retry: rc %d\n",
893                               exp->exp_obd->obd_name,
894                               obd_uuid2str(&exp->exp_client_uuid),
895                               obd_export_nid2str(exp), rc);
896         }
897         /* send a bulk after reply to simulate a network delay or reordering
898          * by a router */
899         if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) {
900                 cfs_waitq_t              waitq;
901                 struct l_wait_info       lwi1;
902
903                 CDEBUG(D_INFO, "reorder BULK\n");
904                 cfs_waitq_init(&waitq);
905
906                 lwi1 = LWI_TIMEOUT_INTR(cfs_time_seconds(3), NULL, NULL, NULL);
907                 l_wait_event(waitq, 0, &lwi1);
908                 rc = target_bulk_io(exp, desc, &lwi);
909                 ptlrpc_free_bulk_nopin(desc);
910         }
911
912         RETURN(rc);
913 }
914
915 static void ost_warn_on_cksum(struct ptlrpc_request *req,
916                               struct ptlrpc_bulk_desc *desc,
917                               struct niobuf_local *local_nb, int npages,
918                               obd_count client_cksum, obd_count server_cksum,
919                               int mmap)
920 {
921         struct obd_export *exp = req->rq_export;
922         struct ost_body *body;
923         char *router;
924         char *via;
925
926         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
927         LASSERT (body != NULL);
928
929         if (req->rq_peer.nid == desc->bd_sender) {
930                 via = router = "";
931         } else {
932                 via = " via ";
933                 router = libcfs_nid2str(desc->bd_sender);
934         }
935
936         if (mmap) {
937                 CDEBUG_LIMIT(D_INFO, "client csum %x, server csum %x\n",
938                              client_cksum, server_cksum);
939                 return;
940         }
941
942         LCONSOLE_ERROR_MSG(0x168, "BAD WRITE CHECKSUM: %s from %s%s%s inode "
943                            DFID" object "LPU64"/"LPU64" extent ["LPU64"-"LPU64
944                            "]: client csum %x, server csum %x\n",
945                            exp->exp_obd->obd_name, libcfs_id2str(req->rq_peer),
946                            via, router,
947                            body->oa.o_valid & OBD_MD_FLFID ?
948                            body->oa.o_parent_seq : (__u64)0,
949                            body->oa.o_valid & OBD_MD_FLFID ?
950                            body->oa.o_parent_oid : 0,
951                            body->oa.o_valid & OBD_MD_FLFID ?
952                            body->oa.o_parent_ver : 0,
953                            body->oa.o_id,
954                            body->oa.o_valid & OBD_MD_FLGROUP ?
955                            body->oa.o_seq : (__u64)0,
956                            local_nb[0].lnb_file_offset,
957                            local_nb[npages-1].lnb_file_offset +
958                            local_nb[npages-1].len - 1,
959                            client_cksum, server_cksum);
960 }
961
962 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
963 {
964         struct ptlrpc_bulk_desc *desc = NULL;
965         struct obd_export       *exp = req->rq_export;
966         struct niobuf_remote    *remote_nb;
967         struct niobuf_local     *local_nb;
968         struct obd_ioobj        *ioo;
969         struct ost_body         *body, *repbody;
970         struct l_wait_info       lwi;
971         struct lustre_handle     lockh = {0};
972         struct lustre_capa      *capa = NULL;
973         __u32                   *rcs;
974         int objcount, niocount, npages;
975         int rc, i, j;
976         obd_count                client_cksum = 0, server_cksum = 0;
977         cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
978         int                      no_reply = 0, mmap = 0;
979         __u32                    o_uid = 0, o_gid = 0;
980         struct ost_thread_local_cache *tls;
981         ENTRY;
982
983         req->rq_bulk_write = 1;
984
985         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
986                 GOTO(out, rc = -EIO);
987         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
988                 GOTO(out, rc = -EFAULT);
989
990         /* pause before transaction has been started */
991         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
992
993         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
994          * ost_rw_hpreq_check(). */
995         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
996         if (body == NULL)
997                 GOTO(out, rc = -EFAULT);
998
999         objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
1000                                         RCL_CLIENT) / sizeof(*ioo);
1001         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1002         if (ioo == NULL)
1003                 GOTO(out, rc = -EFAULT);
1004
1005         rc = ost_validate_obdo(exp, &body->oa, ioo);
1006         if (rc)
1007                 RETURN(rc);
1008
1009         for (niocount = i = 0; i < objcount; i++)
1010                 niocount += ioo[i].ioo_bufcnt;
1011
1012         /*
1013          * It'd be nice to have a capsule function to indicate how many elements
1014          * there were in a buffer for an RMF that's declared to be an array.
1015          * It's easy enough to compute the number of elements here though.
1016          */
1017         remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1018         if (remote_nb == NULL || niocount != (req_capsule_get_size(&req->rq_pill,
1019             &RMF_NIOBUF_REMOTE, RCL_CLIENT) / sizeof(*remote_nb)))
1020                 GOTO(out, rc = -EFAULT);
1021
1022         if ((remote_nb[0].flags & OBD_BRW_MEMALLOC) &&
1023             (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
1024                 cfs_memory_pressure_set();
1025
1026         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
1027                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
1028                 if (capa == NULL) {
1029                         CERROR("Missing capability for OST BRW WRITE");
1030                         GOTO(out, rc = -EFAULT);
1031                 }
1032         }
1033
1034         req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER,
1035                              niocount * sizeof(*rcs));
1036         rc = req_capsule_server_pack(&req->rq_pill);
1037         if (rc != 0)
1038                 GOTO(out, rc);
1039         CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, cfs_fail_val);
1040         rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
1041
1042         tls = ost_tls_get(req);
1043         if (tls == NULL)
1044                 GOTO(out_bulk, rc = -ENOMEM);
1045         local_nb = tls->local;
1046
1047         rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
1048         if (rc != 0)
1049                 GOTO(out_tls, rc);
1050
1051         /*
1052          * If getting the lock took more time than
1053          * client was willing to wait, drop it. b=11330
1054          */
1055         if (cfs_time_current_sec() > req->rq_deadline ||
1056             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
1057                 no_reply = 1;
1058                 CERROR("Dropping timed-out write from %s because locking "
1059                        "object "LPX64" took %ld seconds (limit was %ld).\n",
1060                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
1061                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
1062                        req->rq_deadline - req->rq_arrival_time.tv_sec);
1063                 GOTO(out_lock, rc = -ETIMEDOUT);
1064         }
1065
1066         /* obd_preprw clobbers oa->valid, so save what we need */
1067         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1068                 client_cksum = body->oa.o_cksum;
1069                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1070                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1071         }
1072         if (body->oa.o_valid & OBD_MD_FLFLAGS && body->oa.o_flags & OBD_FL_MMAP)
1073                 mmap = 1;
1074
1075         /* Because we already sync grant info with client when reconnect,
1076          * grant info will be cleared for resent req, then fed_grant and
1077          * total_grant will not be modified in following preprw_write */
1078         if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
1079                 DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
1080                 body->oa.o_valid &= ~OBD_MD_FLGRANT;
1081         }
1082
1083         if (exp_connect_rmtclient(exp)) {
1084                 o_uid = body->oa.o_uid;
1085                 o_gid = body->oa.o_gid;
1086         }
1087
1088         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1089         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
1090
1091         npages = OST_THREAD_POOL_SIZE;
1092         rc = obd_preprw(req->rq_svc_thread->t_env, OBD_BRW_WRITE, exp,
1093                         &repbody->oa, objcount, ioo, remote_nb, &npages,
1094                         local_nb, oti, capa);
1095         if (rc != 0)
1096                 GOTO(out_lock, rc);
1097
1098         desc = ptlrpc_prep_bulk_exp(req, npages,
1099                                      BULK_GET_SINK, OST_BULK_PORTAL);
1100         if (desc == NULL)
1101                 GOTO(skip_transfer, rc = -ENOMEM);
1102
1103         /* NB Having prepped, we must commit... */
1104
1105         for (i = 0; i < npages; i++)
1106                 ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
1107                                             local_nb[i].lnb_page_offset,
1108                                             local_nb[i].len);
1109
1110         rc = sptlrpc_svc_prep_bulk(req, desc);
1111         if (rc != 0)
1112                 GOTO(out_lock, rc);
1113
1114         rc = target_bulk_io(exp, desc, &lwi);
1115         no_reply = rc != 0;
1116
1117 skip_transfer:
1118         if (client_cksum != 0 && rc == 0) {
1119                 static int cksum_counter;
1120                 repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1121                 repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
1122                 repbody->oa.o_flags |= cksum_type_pack(cksum_type);
1123                 server_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1124                 repbody->oa.o_cksum = server_cksum;
1125                 cksum_counter++;
1126                 if (unlikely(client_cksum != server_cksum)) {
1127                         ost_warn_on_cksum(req, desc, local_nb, npages,
1128                                           client_cksum, server_cksum, mmap);
1129                         cksum_counter = 0;
1130
1131                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1132                         CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
1133                                cksum_counter, libcfs_id2str(req->rq_peer),
1134                                server_cksum);
1135                 }
1136         }
1137
1138         /* Must commit after prep above in all cases */
1139         rc = obd_commitrw(req->rq_svc_thread->t_env, OBD_BRW_WRITE, exp,
1140                           &repbody->oa, objcount, ioo, remote_nb, npages,
1141                           local_nb, oti, rc);
1142         if (rc == -ENOTCONN)
1143                 /* quota acquire process has been given up because
1144                  * either the client has been evicted or the client
1145                  * has timed out the request already */
1146                 no_reply = 1;
1147
1148         if (exp_connect_rmtclient(exp)) {
1149                 repbody->oa.o_uid = o_uid;
1150                 repbody->oa.o_gid = o_gid;
1151         }
1152
1153         /*
1154          * Disable sending mtime back to the client. If the client locked the
1155          * whole object, then it has already updated the mtime on its side,
1156          * otherwise it will have to glimpse anyway (see bug 21489, comment 32)
1157          */
1158         repbody->oa.o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLATIME);
1159
1160         if (rc == 0) {
1161                 int nob = 0;
1162
1163                 /* set per-requested niobuf return codes */
1164                 for (i = j = 0; i < niocount; i++) {
1165                         int len = remote_nb[i].len;
1166
1167                         nob += len;
1168                         rcs[i] = 0;
1169                         do {
1170                                 LASSERT(j < npages);
1171                                 if (local_nb[j].rc < 0)
1172                                         rcs[i] = local_nb[j].rc;
1173                                 len -= local_nb[j].len;
1174                                 j++;
1175                         } while (len > 0);
1176                         LASSERT(len == 0);
1177                 }
1178                 LASSERT(j == npages);
1179                 ptlrpc_lprocfs_brw(req, nob);
1180         }
1181
1182 out_lock:
1183         ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
1184 out_tls:
1185         ost_tls_put(req);
1186 out_bulk:
1187         if (desc)
1188                 ptlrpc_free_bulk_nopin(desc);
1189 out:
1190         if (rc == 0) {
1191                 oti_to_request(oti, req);
1192                 target_committed_to_req(req);
1193                 rc = ptlrpc_reply(req);
1194         } else if (!no_reply) {
1195                 /* Only reply if there was no comms problem with bulk */
1196                 target_committed_to_req(req);
1197                 req->rq_status = rc;
1198                 ptlrpc_error(req);
1199         } else {
1200                 /* reply out callback would free */
1201                 ptlrpc_req_drop_rs(req);
1202                 LCONSOLE_WARN("%s: Bulk IO write error with %s (at %s), "
1203                               "client will retry: rc %d\n",
1204                               exp->exp_obd->obd_name,
1205                               obd_uuid2str(&exp->exp_client_uuid),
1206                               obd_export_nid2str(exp), rc);
1207         }
1208         cfs_memory_pressure_clr();
1209         RETURN(rc);
1210 }
1211
1212 /**
1213  * Implementation of OST_SET_INFO.
1214  *
1215  * OST_SET_INFO is like ioctl(): heavily overloaded.  Specifically, it takes a
1216  * "key" and a value RPC buffers as arguments, with the value's contents
1217  * interpreted according to the key.
1218  *
1219  * Value types that need swabbing have swabbing done explicitly, either here or
1220  * in functions called from here.  This should be corrected: all swabbing should
1221  * be done in the capsule abstraction, as that will then allow us to move
1222  * swabbing exclusively to the client without having to modify server code
1223  * outside the capsule abstraction's implementation itself.  To correct this
1224  * will require minor changes to the capsule abstraction; see the comments for
1225  * req_capsule_extend() in layout.c.
1226  */
1227 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
1228 {
1229         struct ost_body *body = NULL, *repbody;
1230         char *key, *val = NULL;
1231         int keylen, vallen, rc = 0;
1232         int is_grant_shrink = 0;
1233         ENTRY;
1234
1235         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
1236         if (key == NULL) {
1237                 DEBUG_REQ(D_HA, req, "no set_info key");
1238                 RETURN(-EFAULT);
1239         }
1240         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
1241                                       RCL_CLIENT);
1242
1243         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
1244                                       RCL_CLIENT);
1245
1246         if ((is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK)))
1247                 /* In this case the value is actually an RMF_OST_BODY, so we
1248                  * transmutate the type of this PTLRPC */
1249                 req_capsule_extend(&req->rq_pill, &RQF_OST_SET_GRANT_INFO);
1250
1251         rc = req_capsule_server_pack(&req->rq_pill);
1252         if (rc)
1253                 RETURN(rc);
1254
1255         if (vallen) {
1256                 if (is_grant_shrink) {
1257                         body = req_capsule_client_get(&req->rq_pill,
1258                                                       &RMF_OST_BODY);
1259                         if (!body)
1260                                 RETURN(-EFAULT);
1261
1262                         repbody = req_capsule_server_get(&req->rq_pill,
1263                                                          &RMF_OST_BODY);
1264                         memcpy(repbody, body, sizeof(*body));
1265                         val = (char*)repbody;
1266                 } else {
1267                         val = req_capsule_client_get(&req->rq_pill,
1268                                                      &RMF_SETINFO_VAL);
1269                 }
1270         }
1271
1272         if (KEY_IS(KEY_EVICT_BY_NID)) {
1273                 if (val && vallen)
1274                         obd_export_evict_by_nid(exp->exp_obd, val);
1275                 GOTO(out, rc = 0);
1276         } else if (KEY_IS(KEY_MDS_CONN) && ptlrpc_req_need_swab(req)) {
1277                 if (vallen < sizeof(__u32))
1278                         RETURN(-EFAULT);
1279                 __swab32s((__u32 *)val);
1280         }
1281
1282         /* OBD will also check if KEY_IS(KEY_GRANT_SHRINK), and will cast val to
1283          * a struct ost_body * value */
1284         rc = obd_set_info_async(req->rq_svc_thread->t_env, exp, keylen,
1285                                 key, vallen, val, NULL);
1286 out:
1287         lustre_msg_set_status(req->rq_repmsg, 0);
1288         RETURN(rc);
1289 }
1290
1291 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
1292 {
1293         void *key, *reply;
1294         int keylen, replylen, rc = 0;
1295         struct req_capsule *pill = &req->rq_pill;
1296         ENTRY;
1297
1298         /* this common part for get_info rpc */
1299         key = req_capsule_client_get(pill, &RMF_SETINFO_KEY);
1300         if (key == NULL) {
1301                 DEBUG_REQ(D_HA, req, "no get_info key");
1302                 RETURN(-EFAULT);
1303         }
1304         keylen = req_capsule_get_size(pill, &RMF_SETINFO_KEY, RCL_CLIENT);
1305
1306         if (KEY_IS(KEY_FIEMAP)) {
1307                 struct ll_fiemap_info_key *fm_key = key;
1308                 int rc;
1309
1310                 rc = ost_validate_obdo(exp, &fm_key->oa, NULL);
1311                 if (rc)
1312                         RETURN(rc);
1313         }
1314
1315         rc = obd_get_info(req->rq_svc_thread->t_env, exp, keylen, key,
1316                           &replylen, NULL, NULL);
1317         if (rc)
1318                 RETURN(rc);
1319
1320         req_capsule_set_size(pill, &RMF_GENERIC_DATA,
1321                              RCL_SERVER, replylen);
1322
1323         rc = req_capsule_server_pack(pill);
1324         if (rc)
1325                 RETURN(rc);
1326
1327         reply = req_capsule_server_get(pill, &RMF_GENERIC_DATA);
1328         if (reply == NULL)
1329                 RETURN(-ENOMEM);
1330
1331         if (KEY_IS(KEY_LAST_FID)) {
1332                 void *val;
1333                 int vallen;
1334
1335                 req_capsule_extend(pill, &RQF_OST_GET_INFO_LAST_FID);
1336                 val = req_capsule_client_get(pill, &RMF_SETINFO_VAL);
1337                 vallen = req_capsule_get_size(pill, &RMF_SETINFO_VAL,
1338                                               RCL_CLIENT);
1339                 if (val != NULL && vallen > 0 && replylen >= vallen) {
1340                         memcpy(reply, val, vallen);
1341                 } else {
1342                         CERROR("%s: invalid req val %p vallen %d replylen %d\n",
1343                                exp->exp_obd->obd_name, val, vallen, replylen);
1344                         GOTO(out, rc = -EINVAL);
1345                 }
1346         }
1347
1348         /* call again to fill in the reply buffer */
1349         rc = obd_get_info(req->rq_svc_thread->t_env, exp, keylen, key,
1350                           &replylen, reply, NULL);
1351 out:
1352         lustre_msg_set_status(req->rq_repmsg, 0);
1353         RETURN(rc);
1354 }
1355
1356 static int ost_handle_quotactl(struct ptlrpc_request *req)
1357 {
1358         struct obd_quotactl *oqctl, *repoqc;
1359         int rc;
1360         ENTRY;
1361
1362         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1363         if (oqctl == NULL)
1364                 GOTO(out, rc = -EPROTO);
1365
1366         rc = req_capsule_server_pack(&req->rq_pill);
1367         if (rc)
1368                 GOTO(out, rc);
1369
1370         repoqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1371         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1372         *repoqc = *oqctl;
1373
1374 out:
1375         RETURN(rc);
1376 }
1377
1378 static int ost_handle_quotacheck(struct ptlrpc_request *req)
1379 {
1380         struct obd_quotactl *oqctl;
1381         int rc;
1382         ENTRY;
1383
1384         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1385         if (oqctl == NULL)
1386                 RETURN(-EPROTO);
1387
1388         rc = req_capsule_server_pack(&req->rq_pill);
1389         if (rc)
1390                 RETURN(-ENOMEM);
1391
1392         /* deprecated, not used any more */
1393         req->rq_status = -EOPNOTSUPP;
1394         RETURN(-EOPNOTSUPP);
1395 }
1396
1397 static int ost_llog_handle_connect(struct obd_export *exp,
1398                                    struct ptlrpc_request *req)
1399 {
1400         struct llogd_conn_body *body;
1401         int rc;
1402         ENTRY;
1403
1404         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY);
1405         rc = obd_llog_connect(exp, body);
1406         RETURN(rc);
1407 }
1408
1409 #define ost_init_sec_none(reply, exp)                                   \
1410 do {                                                                    \
1411         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |          \
1412                                       OBD_CONNECT_RMT_CLIENT_FORCE |    \
1413                                       OBD_CONNECT_OSS_CAPA);            \
1414         spin_lock(&exp->exp_lock);                                      \
1415         exp->exp_connect_flags = reply->ocd_connect_flags;              \
1416         spin_unlock(&exp->exp_lock);                                    \
1417 } while (0)
1418
1419 static int ost_init_sec_level(struct ptlrpc_request *req)
1420 {
1421         struct obd_export *exp = req->rq_export;
1422         struct req_capsule *pill = &req->rq_pill;
1423         struct obd_device *obd = exp->exp_obd;
1424         struct filter_obd *filter = &obd->u.filter;
1425         char *client = libcfs_nid2str(req->rq_peer.nid);
1426         struct obd_connect_data *data, *reply;
1427         int rc = 0, remote;
1428         ENTRY;
1429
1430         data = req_capsule_client_get(pill, &RMF_CONNECT_DATA);
1431         reply = req_capsule_server_get(pill, &RMF_CONNECT_DATA);
1432         if (data == NULL || reply == NULL)
1433                 RETURN(-EFAULT);
1434
1435         /* connection from MDT is always trusted */
1436         if (req->rq_auth_usr_mdt) {
1437                 ost_init_sec_none(reply, exp);
1438                 RETURN(0);
1439         }
1440
1441         /* no GSS support case */
1442         if (!req->rq_auth_gss) {
1443                 if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
1444                         CWARN("client %s -> target %s does not user GSS, "
1445                               "can not run under security level %d.\n",
1446                               client, obd->obd_name, filter->fo_sec_level);
1447                         RETURN(-EACCES);
1448                 } else {
1449                         ost_init_sec_none(reply, exp);
1450                         RETURN(0);
1451                 }
1452         }
1453
1454         /* old version case */
1455         if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
1456                      !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
1457                 if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
1458                         CWARN("client %s -> target %s uses old version, "
1459                               "can not run under security level %d.\n",
1460                               client, obd->obd_name, filter->fo_sec_level);
1461                         RETURN(-EACCES);
1462                 } else {
1463                         CWARN("client %s -> target %s uses old version, "
1464                               "run under security level %d.\n",
1465                               client, obd->obd_name, filter->fo_sec_level);
1466                         ost_init_sec_none(reply, exp);
1467                         RETURN(0);
1468                 }
1469         }
1470
1471         remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
1472         if (remote) {
1473                 if (!req->rq_auth_remote)
1474                         CDEBUG(D_SEC, "client (local realm) %s -> target %s "
1475                                "asked to be remote.\n", client, obd->obd_name);
1476         } else if (req->rq_auth_remote) {
1477                 remote = 1;
1478                 CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
1479                        "as remote by default.\n", client, obd->obd_name);
1480         }
1481
1482         if (remote) {
1483                 if (!filter->fo_fl_oss_capa) {
1484                         CDEBUG(D_SEC, "client %s -> target %s is set as remote,"
1485                                " but OSS capabilities are not enabled: %d.\n",
1486                                client, obd->obd_name, filter->fo_fl_oss_capa);
1487                         RETURN(-EACCES);
1488                 }
1489         }
1490
1491         switch (filter->fo_sec_level) {
1492         case LUSTRE_SEC_NONE:
1493                 if (!remote) {
1494                         ost_init_sec_none(reply, exp);
1495                         break;
1496                 } else {
1497                         CDEBUG(D_SEC, "client %s -> target %s is set as remote, "
1498                                "can not run under security level %d.\n",
1499                                client, obd->obd_name, filter->fo_sec_level);
1500                         RETURN(-EACCES);
1501                 }
1502         case LUSTRE_SEC_REMOTE:
1503                 if (!remote)
1504                         ost_init_sec_none(reply, exp);
1505                 break;
1506         case LUSTRE_SEC_ALL:
1507                 if (!remote) {
1508                         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
1509                                                       OBD_CONNECT_RMT_CLIENT_FORCE);
1510                         if (!filter->fo_fl_oss_capa)
1511                                 reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
1512
1513                         spin_lock(&exp->exp_lock);
1514                         exp->exp_connect_flags = reply->ocd_connect_flags;
1515                         spin_unlock(&exp->exp_lock);
1516                 }
1517                 break;
1518         default:
1519                 RETURN(-EINVAL);
1520         }
1521
1522         RETURN(rc);
1523 }
1524
1525 /*
1526  * FIXME
1527  * this should be done in filter_connect()/filter_reconnect(), but
1528  * we can't obtain information like NID, which stored in incoming
1529  * request, thus can't decide what flavor to use. so we do it here.
1530  *
1531  * This hack should be removed after the OST stack be rewritten, just
1532  * like what we are doing in mdt_obd_connect()/mdt_obd_reconnect().
1533  */
1534 static int ost_connect_check_sptlrpc(struct ptlrpc_request *req)
1535 {
1536         struct obd_export     *exp = req->rq_export;
1537         struct filter_obd     *filter = &exp->exp_obd->u.filter;
1538         struct sptlrpc_flavor  flvr;
1539         int                    rc = 0;
1540
1541         if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
1542                             LUSTRE_ECHO_NAME) == 0)) {
1543                 exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
1544                 return 0;
1545         }
1546
1547         if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
1548                 read_lock(&filter->fo_sptlrpc_lock);
1549                 sptlrpc_target_choose_flavor(&filter->fo_sptlrpc_rset,
1550                                              req->rq_sp_from,
1551                                              req->rq_peer.nid,
1552                                              &flvr);
1553                 read_unlock(&filter->fo_sptlrpc_lock);
1554
1555                 spin_lock(&exp->exp_lock);
1556
1557                 exp->exp_sp_peer = req->rq_sp_from;
1558                 exp->exp_flvr = flvr;
1559
1560                 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
1561                     exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
1562                         CERROR("unauthorized rpc flavor %x from %s, "
1563                                "expect %x\n", req->rq_flvr.sf_rpc,
1564                                libcfs_nid2str(req->rq_peer.nid),
1565                                exp->exp_flvr.sf_rpc);
1566                         rc = -EACCES;
1567                 }
1568
1569                 spin_unlock(&exp->exp_lock);
1570         } else {
1571                 if (exp->exp_sp_peer != req->rq_sp_from) {
1572                         CERROR("RPC source %s doesn't match %s\n",
1573                                sptlrpc_part2name(req->rq_sp_from),
1574                                sptlrpc_part2name(exp->exp_sp_peer));
1575                         rc = -EACCES;
1576                 } else {
1577                         rc = sptlrpc_target_export_check(exp, req);
1578                 }
1579         }
1580
1581         return rc;
1582 }
1583
1584 /* Ensure that data and metadata are synced to the disk when lock is cancelled
1585  * (if requested) */
1586 int ost_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
1587                      void *data, int flag)
1588 {
1589         struct lu_env   env;
1590         __u32           sync_lock_cancel = 0;
1591         __u32           len = sizeof(sync_lock_cancel);
1592         int             rc = 0;
1593
1594         ENTRY;
1595
1596         rc = lu_env_init(&env, LCT_DT_THREAD);
1597         if (unlikely(rc != 0))
1598                 RETURN(rc);
1599
1600         rc = obd_get_info(&env, lock->l_export, sizeof(KEY_SYNC_LOCK_CANCEL),
1601                           KEY_SYNC_LOCK_CANCEL, &len, &sync_lock_cancel, NULL);
1602         if (rc == 0 && flag == LDLM_CB_CANCELING &&
1603             (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) &&
1604             (sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
1605              (sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
1606               lock->l_flags & LDLM_FL_CBPENDING))) {
1607                 struct obd_info *oinfo;
1608                 struct obdo     *oa;
1609                 int              rc;
1610
1611                 OBD_ALLOC_PTR(oinfo);
1612                 if (!oinfo)
1613                         GOTO(out_env, rc = -ENOMEM);
1614                 OBDO_ALLOC(oa);
1615                 if (!oa) {
1616                         OBD_FREE_PTR(oinfo);
1617                         GOTO(out_env, rc = -ENOMEM);
1618                 }
1619                 oa->o_id = lock->l_resource->lr_name.name[0];
1620                 oa->o_seq = lock->l_resource->lr_name.name[1];
1621                 oa->o_valid = OBD_MD_FLID|OBD_MD_FLGROUP;
1622                 oinfo->oi_oa = oa;
1623                 oinfo->oi_capa = BYPASS_CAPA;
1624
1625                 rc = obd_sync(&env, lock->l_export, oinfo,
1626                               lock->l_policy_data.l_extent.start,
1627                               lock->l_policy_data.l_extent.end, NULL);
1628                 if (rc)
1629                         CERROR("Error %d syncing data on lock cancel\n", rc);
1630
1631                 OBDO_FREE(oa);
1632                 OBD_FREE_PTR(oinfo);
1633         }
1634
1635         rc = ldlm_server_blocking_ast(lock, desc, data, flag);
1636 out_env:
1637         lu_env_fini(&env);
1638         RETURN(rc);
1639 }
1640
1641 static int ost_filter_recovery_request(struct ptlrpc_request *req,
1642                                        struct obd_device *obd, int *process)
1643 {
1644         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1645         case OST_CONNECT: /* This will never get here, but for completeness. */
1646         case OST_DISCONNECT:
1647                *process = 1;
1648                RETURN(0);
1649
1650         case OBD_PING:
1651         case OST_CREATE:
1652         case OST_DESTROY:
1653         case OST_PUNCH:
1654         case OST_SETATTR:
1655         case OST_SYNC:
1656         case OST_WRITE:
1657         case OBD_LOG_CANCEL:
1658         case LDLM_ENQUEUE:
1659                 *process = target_queue_recovery_request(req, obd);
1660                 RETURN(0);
1661
1662         default:
1663                 DEBUG_REQ(D_WARNING, req, "not permitted during recovery");
1664                 *process = -EAGAIN;
1665                 RETURN(0);
1666         }
1667 }
1668
1669 int ost_msg_check_version(struct lustre_msg *msg)
1670 {
1671         int rc;
1672
1673         switch(lustre_msg_get_opc(msg)) {
1674         case OST_CONNECT:
1675         case OST_DISCONNECT:
1676         case OBD_PING:
1677         case SEC_CTX_INIT:
1678         case SEC_CTX_INIT_CONT:
1679         case SEC_CTX_FINI:
1680                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1681                 if (rc)
1682                         CERROR("bad opc %u version %08x, expecting %08x\n",
1683                                lustre_msg_get_opc(msg),
1684                                lustre_msg_get_version(msg),
1685                                LUSTRE_OBD_VERSION);
1686                 break;
1687         case SEQ_QUERY:
1688                 /* Note: client always use MDS_VERSION for FID request */
1689                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
1690                 if (rc)
1691                         CERROR("bad opc %u version %08x, expecting %08x\n",
1692                                lustre_msg_get_opc(msg),
1693                                lustre_msg_get_version(msg),
1694                                LUSTRE_MDS_VERSION);
1695                 break;
1696         case OST_CREATE:
1697         case OST_DESTROY:
1698         case OST_GETATTR:
1699         case OST_SETATTR:
1700         case OST_WRITE:
1701         case OST_READ:
1702         case OST_PUNCH:
1703         case OST_STATFS:
1704         case OST_SYNC:
1705         case OST_SET_INFO:
1706         case OST_GET_INFO:
1707         case OST_QUOTACHECK:
1708         case OST_QUOTACTL:
1709                 rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
1710                 if (rc)
1711                         CERROR("bad opc %u version %08x, expecting %08x\n",
1712                                lustre_msg_get_opc(msg),
1713                                lustre_msg_get_version(msg),
1714                                LUSTRE_OST_VERSION);
1715                 break;
1716         case LDLM_ENQUEUE:
1717         case LDLM_CONVERT:
1718         case LDLM_CANCEL:
1719         case LDLM_BL_CALLBACK:
1720         case LDLM_CP_CALLBACK:
1721                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1722                 if (rc)
1723                         CERROR("bad opc %u version %08x, expecting %08x\n",
1724                                lustre_msg_get_opc(msg),
1725                                lustre_msg_get_version(msg),
1726                                LUSTRE_DLM_VERSION);
1727                 break;
1728         case LLOG_ORIGIN_CONNECT:
1729         case OBD_LOG_CANCEL:
1730                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1731                 if (rc)
1732                         CERROR("bad opc %u version %08x, expecting %08x\n",
1733                                lustre_msg_get_opc(msg),
1734                                lustre_msg_get_version(msg),
1735                                LUSTRE_LOG_VERSION);
1736                 break;
1737         case OST_QUOTA_ADJUST_QUNIT:
1738                 rc = -ENOTSUPP;
1739                 CERROR("Quota adjust is deprecated as of 2.4.0\n");
1740                 break;
1741         default:
1742                 CERROR("Unexpected opcode %d\n", lustre_msg_get_opc(msg));
1743                 rc = -ENOTSUPP;
1744         }
1745         return rc;
1746 }
1747
1748 struct ost_prolong_data {
1749         struct ptlrpc_request *opd_req;
1750         struct obd_export     *opd_exp;
1751         struct obdo           *opd_oa;
1752         struct ldlm_res_id     opd_resid;
1753         struct ldlm_extent     opd_extent;
1754         ldlm_mode_t            opd_mode;
1755         unsigned int           opd_locks;
1756         int                    opd_timeout;
1757 };
1758
1759 /* prolong locks for the current service time of the corresponding
1760  * portal (= OST_IO_PORTAL)
1761  */
1762 static inline int prolong_timeout(struct ptlrpc_request *req)
1763 {
1764         struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
1765
1766         if (AT_OFF)
1767                 return obd_timeout / 2;
1768
1769         return max(at_est2timeout(at_get(&svcpt->scp_at_estimate)),
1770                    ldlm_timeout);
1771 }
1772
1773 static void ost_prolong_lock_one(struct ost_prolong_data *opd,
1774                                  struct ldlm_lock *lock)
1775 {
1776         LASSERT(lock->l_export == opd->opd_exp);
1777
1778         if (lock->l_destroyed) /* lock already cancelled */
1779                 return;
1780
1781         /* XXX: never try to grab resource lock here because we're inside
1782          * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take
1783          * res lock and then exp_bl_list_lock. */
1784
1785         if (!(lock->l_flags & LDLM_FL_AST_SENT))
1786                 /* ignore locks not being cancelled */
1787                 return;
1788
1789         LDLM_DEBUG(lock,
1790                    "refreshed for req x"LPU64" ext("LPU64"->"LPU64") to %ds.\n",
1791                    opd->opd_req->rq_xid, opd->opd_extent.start,
1792                    opd->opd_extent.end, opd->opd_timeout);
1793
1794         /* OK. this is a possible lock the user holds doing I/O
1795          * let's refresh eviction timer for it */
1796         ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
1797         ++opd->opd_locks;
1798 }
1799
1800 static void ost_prolong_locks(struct ost_prolong_data *data)
1801 {
1802         struct obd_export *exp = data->opd_exp;
1803         struct obdo       *oa  = data->opd_oa;
1804         struct ldlm_lock  *lock;
1805         ENTRY;
1806
1807         if (oa->o_valid & OBD_MD_FLHANDLE) {
1808                 /* mostly a request should be covered by only one lock, try
1809                  * fast path. */
1810                 lock = ldlm_handle2lock(&oa->o_handle);
1811                 if (lock != NULL) {
1812                         /* Fast path to check if the lock covers the whole IO
1813                          * region exclusively. */
1814                         if (lock->l_granted_mode == LCK_PW &&
1815                             ldlm_extent_contain(&lock->l_policy_data.l_extent,
1816                                                 &data->opd_extent)) {
1817                                 /* bingo */
1818                                 ost_prolong_lock_one(data, lock);
1819                                 LDLM_LOCK_PUT(lock);
1820                                 RETURN_EXIT;
1821                         }
1822                         LDLM_LOCK_PUT(lock);
1823                 }
1824         }
1825
1826
1827         spin_lock_bh(&exp->exp_bl_list_lock);
1828         cfs_list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
1829                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1830                 LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
1831
1832                 if (!ldlm_res_eq(&data->opd_resid, &lock->l_resource->lr_name))
1833                         continue;
1834
1835                 if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent,
1836                                          &data->opd_extent))
1837                         continue;
1838
1839                 ost_prolong_lock_one(data, lock);
1840         }
1841         spin_unlock_bh(&exp->exp_bl_list_lock);
1842
1843         EXIT;
1844 }
1845
1846 /**
1847  * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
1848  * not.
1849  */
1850 static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
1851                                    struct ldlm_lock *lock)
1852 {
1853         struct niobuf_remote *nb;
1854         struct obd_ioobj *ioo;
1855         int mode, opc;
1856         struct ldlm_extent ext;
1857         ENTRY;
1858
1859         opc = lustre_msg_get_opc(req->rq_reqmsg);
1860         LASSERT(opc == OST_READ || opc == OST_WRITE);
1861
1862         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1863         LASSERT(ioo != NULL);
1864
1865         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1866         LASSERT(nb != NULL);
1867
1868         ext.start = nb->offset;
1869         nb += ioo->ioo_bufcnt - 1;
1870         ext.end = nb->offset + nb->len - 1;
1871
1872         LASSERT(lock->l_resource != NULL);
1873         if (!osc_res_name_eq(ioo->ioo_id, ioo->ioo_seq,
1874                              &lock->l_resource->lr_name))
1875                 RETURN(0);
1876
1877         mode = LCK_PW;
1878         if (opc == OST_READ)
1879                 mode |= LCK_PR;
1880         if (!(lock->l_granted_mode & mode))
1881                 RETURN(0);
1882
1883         RETURN(ldlm_extent_overlap(&lock->l_policy_data.l_extent, &ext));
1884 }
1885
1886 /**
1887  * High-priority queue request check for whether the given PTLRPC request (\a
1888  * req) is blocking an LDLM lock cancel.
1889  *
1890  * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
1891  * cancel, 0 if it is not, and -EFAULT if the request is malformed.
1892  *
1893  * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue.  This
1894  * function looks only at OST_READs and OST_WRITEs.
1895  */
1896 static int ost_rw_hpreq_check(struct ptlrpc_request *req)
1897 {
1898         struct obd_device *obd = req->rq_export->exp_obd;
1899         struct ost_body *body;
1900         struct obd_ioobj *ioo;
1901         struct niobuf_remote *nb;
1902         struct ost_prolong_data opd = { 0 };
1903         int mode, opc;
1904         ENTRY;
1905
1906         /*
1907          * Use LASSERT to do sanity check because malformed RPCs should have
1908          * been filtered out in ost_hpreq_handler().
1909          */
1910         opc = lustre_msg_get_opc(req->rq_reqmsg);
1911         LASSERT(opc == OST_READ || opc == OST_WRITE);
1912
1913         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1914         LASSERT(body != NULL);
1915
1916         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1917         LASSERT(ioo != NULL);
1918
1919         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1920         LASSERT(nb != NULL);
1921         LASSERT(!(nb->flags & OBD_BRW_SRVLOCK));
1922
1923         osc_build_res_name(ioo->ioo_id, ioo->ioo_seq, &opd.opd_resid);
1924
1925         opd.opd_req = req;
1926         mode = LCK_PW;
1927         if (opc == OST_READ)
1928                 mode |= LCK_PR;
1929         opd.opd_mode = mode;
1930         opd.opd_exp = req->rq_export;
1931         opd.opd_oa  = &body->oa;
1932         opd.opd_extent.start = nb->offset;
1933         nb += ioo->ioo_bufcnt - 1;
1934         opd.opd_extent.end = nb->offset + nb->len - 1;
1935         opd.opd_timeout = prolong_timeout(req);
1936
1937         DEBUG_REQ(D_RPCTRACE, req,
1938                "%s %s: refresh rw locks: " LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
1939                obd->obd_name, cfs_current()->comm,
1940                opd.opd_resid.name[0], opd.opd_resid.name[1],
1941                opd.opd_extent.start, opd.opd_extent.end);
1942
1943         ost_prolong_locks(&opd);
1944
1945         CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
1946                obd->obd_name, opd.opd_locks, req);
1947
1948         RETURN(opd.opd_locks);
1949 }
1950
1951 static void ost_rw_hpreq_fini(struct ptlrpc_request *req)
1952 {
1953         (void)ost_rw_hpreq_check(req);
1954 }
1955
1956 /**
1957  * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
1958  */
1959 static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
1960                                       struct ldlm_lock *lock)
1961 {
1962         struct ost_body *body;
1963         ENTRY;
1964
1965         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1966         LASSERT(body != NULL);
1967
1968         if (body->oa.o_valid & OBD_MD_FLHANDLE &&
1969             body->oa.o_handle.cookie == lock->l_handle.h_cookie)
1970                 RETURN(1);
1971
1972         RETURN(0);
1973 }
1974
1975 /**
1976  * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
1977  */
1978 static int ost_punch_hpreq_check(struct ptlrpc_request *req)
1979 {
1980         struct obd_device *obd = req->rq_export->exp_obd;
1981         struct ost_body *body;
1982         struct obdo *oa;
1983         struct ost_prolong_data opd = { 0 };
1984         __u64 start, end;
1985         ENTRY;
1986
1987         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1988         LASSERT(body != NULL);
1989
1990         oa = &body->oa;
1991         LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS) ||
1992                 !(oa->o_flags & OBD_FL_SRVLOCK));
1993
1994         start = oa->o_size;
1995         end = start + oa->o_blocks;
1996
1997         opd.opd_req = req;
1998         opd.opd_mode = LCK_PW;
1999         opd.opd_exp = req->rq_export;
2000         opd.opd_oa  = oa;
2001         opd.opd_extent.start = start;
2002         opd.opd_extent.end   = end;
2003         if (oa->o_blocks == OBD_OBJECT_EOF)
2004                 opd.opd_extent.end = OBD_OBJECT_EOF;
2005         opd.opd_timeout = prolong_timeout(req);
2006
2007         osc_build_res_name(oa->o_id, oa->o_seq, &opd.opd_resid);
2008
2009         CDEBUG(D_DLMTRACE,
2010                "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
2011                obd->obd_name,
2012                opd.opd_resid.name[0], opd.opd_resid.name[1],
2013                opd.opd_extent.start, opd.opd_extent.end);
2014
2015         ost_prolong_locks(&opd);
2016
2017         CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
2018                obd->obd_name, opd.opd_locks, req);
2019
2020         RETURN(opd.opd_locks > 0);
2021 }
2022
2023 static void ost_punch_hpreq_fini(struct ptlrpc_request *req)
2024 {
2025         (void)ost_punch_hpreq_check(req);
2026 }
2027
2028 struct ptlrpc_hpreq_ops ost_hpreq_rw = {
2029         .hpreq_lock_match = ost_rw_hpreq_lock_match,
2030         .hpreq_check      = ost_rw_hpreq_check,
2031         .hpreq_fini       = ost_rw_hpreq_fini
2032 };
2033
2034 struct ptlrpc_hpreq_ops ost_hpreq_punch = {
2035         .hpreq_lock_match = ost_punch_hpreq_lock_match,
2036         .hpreq_check      = ost_punch_hpreq_check,
2037         .hpreq_fini       = ost_punch_hpreq_fini
2038 };
2039
2040 /** Assign high priority operations to the request if needed. */
2041 static int ost_io_hpreq_handler(struct ptlrpc_request *req)
2042 {
2043         ENTRY;
2044         if (req->rq_export) {
2045                 int opc = lustre_msg_get_opc(req->rq_reqmsg);
2046                 struct ost_body *body;
2047
2048                 if (opc == OST_READ || opc == OST_WRITE) {
2049                         struct niobuf_remote *nb;
2050                         struct obd_ioobj *ioo;
2051                         int objcount, niocount;
2052                         int rc;
2053                         int i;
2054
2055                         /* RPCs on the H-P queue can be inspected before
2056                          * ost_handler() initializes their pills, so we
2057                          * initialize that here.  Capsule initialization is
2058                          * idempotent, as is setting the pill's format (provided
2059                          * it doesn't change).
2060                          */
2061                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2062                         if (opc == OST_READ)
2063                                 req_capsule_set(&req->rq_pill,
2064                                                 &RQF_OST_BRW_READ);
2065                         else
2066                                 req_capsule_set(&req->rq_pill,
2067                                                 &RQF_OST_BRW_WRITE);
2068
2069                         body = req_capsule_client_get(&req->rq_pill,
2070                                                       &RMF_OST_BODY);
2071                         if (body == NULL) {
2072                                 CERROR("Missing/short ost_body\n");
2073                                 RETURN(-EFAULT);
2074                         }
2075
2076                         objcount = req_capsule_get_size(&req->rq_pill,
2077                                                         &RMF_OBD_IOOBJ,
2078                                                         RCL_CLIENT) /
2079                                                         sizeof(*ioo);
2080                         if (objcount == 0) {
2081                                 CERROR("Missing/short ioobj\n");
2082                                 RETURN(-EFAULT);
2083                         }
2084                         if (objcount > 1) {
2085                                 CERROR("too many ioobjs (%d)\n", objcount);
2086                                 RETURN(-EFAULT);
2087                         }
2088
2089                         ioo = req_capsule_client_get(&req->rq_pill,
2090                                                      &RMF_OBD_IOOBJ);
2091                         if (ioo == NULL) {
2092                                 CERROR("Missing/short ioobj\n");
2093                                 RETURN(-EFAULT);
2094                         }
2095
2096                         rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
2097                         if (rc) {
2098                                 CERROR("invalid object ids\n");
2099                                 RETURN(rc);
2100                         }
2101
2102                         for (niocount = i = 0; i < objcount; i++) {
2103                                 if (ioo[i].ioo_bufcnt == 0) {
2104                                         CERROR("ioo[%d] has zero bufcnt\n", i);
2105                                         RETURN(-EFAULT);
2106                                 }
2107                                 niocount += ioo[i].ioo_bufcnt;
2108                         }
2109                         if (niocount > PTLRPC_MAX_BRW_PAGES) {
2110                                 DEBUG_REQ(D_RPCTRACE, req,
2111                                           "bulk has too many pages (%d)",
2112                                           niocount);
2113                                 RETURN(-EFAULT);
2114                         }
2115
2116                         nb = req_capsule_client_get(&req->rq_pill,
2117                                                     &RMF_NIOBUF_REMOTE);
2118                         if (nb == NULL) {
2119                                 CERROR("Missing/short niobuf\n");
2120                                 RETURN(-EFAULT);
2121                         }
2122
2123                         if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
2124                                 req->rq_ops = &ost_hpreq_rw;
2125                 } else if (opc == OST_PUNCH) {
2126                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2127                         req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
2128
2129                         body = req_capsule_client_get(&req->rq_pill,
2130                                                       &RMF_OST_BODY);
2131                         if (body == NULL) {
2132                                 CERROR("Missing/short ost_body\n");
2133                                 RETURN(-EFAULT);
2134                         }
2135
2136                         if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
2137                             !(body->oa.o_flags & OBD_FL_SRVLOCK))
2138                                 req->rq_ops = &ost_hpreq_punch;
2139                 }
2140         }
2141         RETURN(0);
2142 }
2143
2144 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
2145 int ost_handle(struct ptlrpc_request *req)
2146 {
2147         struct obd_trans_info trans_info = { 0, };
2148         struct obd_trans_info *oti = &trans_info;
2149         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
2150         struct obd_device *obd = NULL;
2151         ENTRY;
2152
2153         /* OST module is kept between remounts, but the last reference
2154          * to specific module (say, osd or ofd) kills all related keys
2155          * from the environment. so we have to refill it until the root
2156          * cause is fixed properly */
2157         lu_env_refill(req->rq_svc_thread->t_env);
2158
2159         LASSERT(current->journal_info == NULL);
2160
2161         /* primordial rpcs don't affect server recovery */
2162         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2163         case SEC_CTX_INIT:
2164         case SEC_CTX_INIT_CONT:
2165         case SEC_CTX_FINI:
2166                 GOTO(out, rc = 0);
2167         }
2168
2169         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2170
2171         if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
2172                 if (!class_connected_export(req->rq_export)) {
2173                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
2174                                lustre_msg_get_opc(req->rq_reqmsg),
2175                                libcfs_id2str(req->rq_peer));
2176                         req->rq_status = -ENOTCONN;
2177                         GOTO(out, rc = -ENOTCONN);
2178                 }
2179
2180                 obd = req->rq_export->exp_obd;
2181
2182                 /* Check for aborted recovery. */
2183                 if (obd->obd_recovering) {
2184                         rc = ost_filter_recovery_request(req, obd,
2185                                                          &should_process);
2186                         if (rc || !should_process)
2187                                 RETURN(rc);
2188                         else if (should_process < 0) {
2189                                 req->rq_status = should_process;
2190                                 rc = ptlrpc_error(req);
2191                                 RETURN(rc);
2192                         }
2193                 }
2194         }
2195
2196         oti_init(oti, req);
2197
2198         rc = ost_msg_check_version(req->rq_reqmsg);
2199         if (rc)
2200                 RETURN(rc);
2201
2202         if (req && req->rq_reqmsg && req->rq_export &&
2203             (req->rq_export->exp_connect_flags & OBD_CONNECT_JOBSTATS))
2204                 oti->oti_jobid = lustre_msg_get_jobid(req->rq_reqmsg);
2205
2206         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2207         case OST_CONNECT: {
2208                 CDEBUG(D_INODE, "connect\n");
2209                 req_capsule_set(&req->rq_pill, &RQF_OST_CONNECT);
2210                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET))
2211                         RETURN(0);
2212                 rc = target_handle_connect(req);
2213                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2))
2214                         RETURN(0);
2215                 if (!rc) {
2216                         rc = ost_init_sec_level(req);
2217                         if (!rc)
2218                                 rc = ost_connect_check_sptlrpc(req);
2219                 }
2220                 break;
2221         }
2222         case OST_DISCONNECT:
2223                 CDEBUG(D_INODE, "disconnect\n");
2224                 req_capsule_set(&req->rq_pill, &RQF_OST_DISCONNECT);
2225                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DISCONNECT_NET))
2226                         RETURN(0);
2227                 rc = target_handle_disconnect(req);
2228                 break;
2229         case OST_CREATE:
2230                 CDEBUG(D_INODE, "create\n");
2231                 req_capsule_set(&req->rq_pill, &RQF_OST_CREATE);
2232                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CREATE_NET))
2233                         RETURN(0);
2234                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2235                         GOTO(out, rc = -EROFS);
2236                 rc = ost_create(req->rq_export, req, oti);
2237                 break;
2238         case OST_DESTROY:
2239                 CDEBUG(D_INODE, "destroy\n");
2240                 req_capsule_set(&req->rq_pill, &RQF_OST_DESTROY);
2241                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DESTROY_NET))
2242                         RETURN(0);
2243                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2244                         GOTO(out, rc = -EROFS);
2245                 rc = ost_destroy(req->rq_export, req, oti);
2246                 break;
2247         case OST_GETATTR:
2248                 CDEBUG(D_INODE, "getattr\n");
2249                 req_capsule_set(&req->rq_pill, &RQF_OST_GETATTR);
2250                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_GETATTR_NET))
2251                         RETURN(0);
2252                 rc = ost_getattr(req->rq_export, req);
2253                 break;
2254         case OST_SETATTR:
2255                 CDEBUG(D_INODE, "setattr\n");
2256                 req_capsule_set(&req->rq_pill, &RQF_OST_SETATTR);
2257                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SETATTR_NET))
2258                         RETURN(0);
2259                 rc = ost_setattr(req->rq_export, req, oti);
2260                 break;
2261         case OST_WRITE:
2262                 req_capsule_set(&req->rq_pill, &RQF_OST_BRW_WRITE);
2263                 CDEBUG(D_INODE, "write\n");
2264                 /* req->rq_request_portal would be nice, if it was set */
2265                 if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
2266                         CERROR("%s: deny write request from %s to portal %u\n",
2267                                req->rq_export->exp_obd->obd_name,
2268                                obd_export_nid2str(req->rq_export),
2269                                ptlrpc_req2svc(req)->srv_req_portal);
2270                         GOTO(out, rc = -EPROTO);
2271                 }
2272                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
2273                         RETURN(0);
2274                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
2275                         GOTO(out, rc = -ENOSPC);
2276                 if (OBD_FAIL_TIMEOUT(OBD_FAIL_OST_EROFS, 1))
2277                         GOTO(out, rc = -EROFS);
2278                 rc = ost_brw_write(req, oti);
2279                 LASSERT(current->journal_info == NULL);
2280                 /* ost_brw_write sends its own replies */
2281                 RETURN(rc);
2282         case OST_READ:
2283                 req_capsule_set(&req->rq_pill, &RQF_OST_BRW_READ);
2284                 CDEBUG(D_INODE, "read\n");
2285                 /* req->rq_request_portal would be nice, if it was set */
2286                 if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
2287                         CERROR("%s: deny read request from %s to portal %u\n",
2288                                req->rq_export->exp_obd->obd_name,
2289                                obd_export_nid2str(req->rq_export),
2290                                ptlrpc_req2svc(req)->srv_req_portal);
2291                         GOTO(out, rc = -EPROTO);
2292                 }
2293                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
2294                         RETURN(0);
2295                 rc = ost_brw_read(req, oti);
2296                 LASSERT(current->journal_info == NULL);
2297                 /* ost_brw_read sends its own replies */
2298                 RETURN(rc);
2299         case OST_PUNCH:
2300                 CDEBUG(D_INODE, "punch\n");
2301                 req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
2302                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_PUNCH_NET))
2303                         RETURN(0);
2304                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2305                         GOTO(out, rc = -EROFS);
2306                 rc = ost_punch(req->rq_export, req, oti);
2307                 break;
2308         case OST_STATFS:
2309                 CDEBUG(D_INODE, "statfs\n");
2310                 req_capsule_set(&req->rq_pill, &RQF_OST_STATFS);
2311                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_NET))
2312                         RETURN(0);
2313                 rc = ost_statfs(req);
2314                 break;
2315         case OST_SYNC:
2316                 CDEBUG(D_INODE, "sync\n");
2317                 req_capsule_set(&req->rq_pill, &RQF_OST_SYNC);
2318                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SYNC_NET))
2319                         RETURN(0);
2320                 rc = ost_sync(req->rq_export, req, oti);
2321                 break;
2322         case OST_SET_INFO:
2323                 DEBUG_REQ(D_INODE, req, "set_info");
2324                 req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
2325                 rc = ost_set_info(req->rq_export, req);
2326                 break;
2327         case OST_GET_INFO:
2328                 DEBUG_REQ(D_INODE, req, "get_info");
2329                 req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC);
2330                 rc = ost_get_info(req->rq_export, req);
2331                 break;
2332         case SEQ_QUERY:
2333                 CDEBUG(D_INODE, "seq\n");
2334                 rc = seq_handle(req);
2335                 break;
2336         case OST_QUOTACHECK:
2337                 CDEBUG(D_INODE, "quotacheck\n");
2338                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACHECK);
2339                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACHECK_NET))
2340                         RETURN(0);
2341                 rc = ost_handle_quotacheck(req);
2342                 break;
2343         case OST_QUOTACTL:
2344                 CDEBUG(D_INODE, "quotactl\n");
2345                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACTL);
2346                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACTL_NET))
2347                         RETURN(0);
2348                 rc = ost_handle_quotactl(req);
2349                 break;
2350         case OBD_PING:
2351                 DEBUG_REQ(D_INODE, req, "ping");
2352                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
2353                 rc = target_handle_ping(req);
2354                 break;
2355         /* FIXME - just reply status */
2356         case LLOG_ORIGIN_CONNECT:
2357                 DEBUG_REQ(D_INODE, req, "log connect");
2358                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_CONNECT);
2359                 rc = ost_llog_handle_connect(req->rq_export, req);
2360                 req->rq_status = rc;
2361                 rc = req_capsule_server_pack(&req->rq_pill);
2362                 if (rc)
2363                         RETURN(rc);
2364                 RETURN(ptlrpc_reply(req));
2365         case OBD_LOG_CANCEL:
2366                 CDEBUG(D_INODE, "log cancel\n");
2367                 req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
2368                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
2369                         RETURN(0);
2370                 rc = llog_origin_handle_cancel(req);
2371                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
2372                         RETURN(0);
2373                 req->rq_status = rc;
2374                 rc = req_capsule_server_pack(&req->rq_pill);
2375                 if (rc)
2376                         RETURN(rc);
2377                 RETURN(ptlrpc_reply(req));
2378         case LDLM_ENQUEUE:
2379                 CDEBUG(D_INODE, "enqueue\n");
2380                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
2381                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_NET))
2382                         RETURN(0);
2383                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
2384                                          ost_blocking_ast,
2385                                          ldlm_server_glimpse_ast);
2386                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
2387                 break;
2388         case LDLM_CONVERT:
2389                 CDEBUG(D_INODE, "convert\n");
2390                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
2391                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT_NET))
2392                         RETURN(0);
2393                 rc = ldlm_handle_convert(req);
2394                 break;
2395         case LDLM_CANCEL:
2396                 CDEBUG(D_INODE, "cancel\n");
2397                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
2398                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET))
2399                         RETURN(0);
2400                 rc = ldlm_handle_cancel(req);
2401                 break;
2402         case LDLM_BL_CALLBACK:
2403         case LDLM_CP_CALLBACK:
2404                 CDEBUG(D_INODE, "callback\n");
2405                 CERROR("callbacks should not happen on OST\n");
2406                 /* fall through */
2407         default:
2408                 CERROR("Unexpected opcode %d\n",
2409                        lustre_msg_get_opc(req->rq_reqmsg));
2410                 req->rq_status = -ENOTSUPP;
2411                 rc = ptlrpc_error(req);
2412                 RETURN(rc);
2413         }
2414
2415         LASSERT(current->journal_info == NULL);
2416
2417         EXIT;
2418         /* If we're DISCONNECTing, the export_data is already freed */
2419         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != OST_DISCONNECT)
2420                 target_committed_to_req(req);
2421
2422 out:
2423         if (!rc)
2424                 oti_to_request(oti, req);
2425
2426         target_send_reply(req, rc, fail);
2427         return 0;
2428 }
2429 EXPORT_SYMBOL(ost_handle);
2430
2431 /*
2432  * free per-thread pool created by ost_io_thread_init().
2433  */
2434 static void ost_io_thread_done(struct ptlrpc_thread *thread)
2435 {
2436         struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
2437                                              * Storage */
2438
2439         ENTRY;
2440
2441         LASSERT(thread != NULL);
2442
2443         /*
2444          * be prepared to handle partially-initialized pools (because this is
2445          * called from ost_io_thread_init() for cleanup.
2446          */
2447         tls = thread->t_data;
2448         if (tls != NULL) {
2449                 OBD_FREE_PTR(tls);
2450                 thread->t_data = NULL;
2451         }
2452         EXIT;
2453 }
2454
2455 /*
2456  * initialize per-thread page pool (bug 5137).
2457  */
2458 static int ost_io_thread_init(struct ptlrpc_thread *thread)
2459 {
2460         struct ost_thread_local_cache *tls;
2461
2462         ENTRY;
2463
2464         LASSERT(thread != NULL);
2465         LASSERT(thread->t_data == NULL);
2466
2467         OBD_ALLOC_PTR(tls);
2468         if (tls == NULL)
2469                 RETURN(-ENOMEM);
2470         thread->t_data = tls;
2471         RETURN(0);
2472 }
2473
2474 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
2475
2476 static struct cfs_cpt_table     *ost_io_cptable;
2477
2478 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
2479 static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
2480 {
2481         static struct ptlrpc_service_conf       svc_conf;
2482         struct ost_obd *ost = &obd->u.ost;
2483         struct lprocfs_static_vars lvars;
2484         nodemask_t              *mask;
2485         int rc;
2486         ENTRY;
2487
2488         rc = cfs_cleanup_group_info();
2489         if (rc)
2490                 RETURN(rc);
2491
2492         lprocfs_ost_init_vars(&lvars);
2493         lprocfs_obd_setup(obd, lvars.obd_vars);
2494
2495         mutex_init(&ost->ost_health_mutex);
2496
2497         svc_conf = (typeof(svc_conf)) {
2498                 .psc_name               = LUSTRE_OSS_NAME,
2499                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
2500                 .psc_buf                = {
2501                         .bc_nbufs               = OST_NBUFS,
2502                         .bc_buf_size            = OST_BUFSIZE,
2503                         .bc_req_max_size        = OST_MAXREQSIZE,
2504                         .bc_rep_max_size        = OST_MAXREPSIZE,
2505                         .bc_req_portal          = OST_REQUEST_PORTAL,
2506                         .bc_rep_portal          = OSC_REPLY_PORTAL,
2507                 },
2508                 .psc_thr                = {
2509                         .tc_thr_name            = "ll_ost",
2510                         .tc_thr_factor          = OSS_THR_FACTOR,
2511                         .tc_nthrs_init          = OSS_NTHRS_INIT,
2512                         .tc_nthrs_base          = OSS_NTHRS_BASE,
2513                         .tc_nthrs_max           = OSS_NTHRS_MAX,
2514                         .tc_nthrs_user          = oss_num_threads,
2515                         .tc_cpu_affinity        = 1,
2516                         .tc_ctx_tags            = LCT_DT_THREAD,
2517                 },
2518                 .psc_cpt                = {
2519                         .cc_pattern             = oss_cpts,
2520                 },
2521                 .psc_ops                = {
2522                         .so_req_handler         = ost_handle,
2523                         .so_req_printer         = target_print_req,
2524                         .so_hpreq_handler       = ptlrpc_hpreq_handler,
2525                 },
2526         };
2527         ost->ost_service = ptlrpc_register_service(&svc_conf,
2528                                                    obd->obd_proc_entry);
2529         if (IS_ERR(ost->ost_service)) {
2530                 rc = PTR_ERR(ost->ost_service);
2531                 CERROR("failed to start service: %d\n", rc);
2532                 GOTO(out_lprocfs, rc);
2533         }
2534
2535         memset(&svc_conf, 0, sizeof(svc_conf));
2536         svc_conf = (typeof(svc_conf)) {
2537                 .psc_name               = "ost_create",
2538                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
2539                 .psc_buf                = {
2540                         .bc_nbufs               = OST_NBUFS,
2541                         .bc_buf_size            = OST_BUFSIZE,
2542                         .bc_req_max_size        = OST_MAXREQSIZE,
2543                         .bc_rep_max_size        = OST_MAXREPSIZE,
2544                         .bc_req_portal          = OST_CREATE_PORTAL,
2545                         .bc_rep_portal          = OSC_REPLY_PORTAL,
2546                 },
2547                 .psc_thr                = {
2548                         .tc_thr_name            = "ll_ost_create",
2549                         .tc_thr_factor          = OSS_CR_THR_FACTOR,
2550                         .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
2551                         .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
2552                         .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
2553                         .tc_nthrs_user          = oss_num_create_threads,
2554                         .tc_cpu_affinity        = 1,
2555                         .tc_ctx_tags            = LCT_DT_THREAD,
2556                 },
2557                 .psc_cpt                = {
2558                         .cc_pattern             = oss_cpts,
2559                 },
2560                 .psc_ops                = {
2561                         .so_req_handler         = ost_handle,
2562                         .so_req_printer         = target_print_req,
2563                 },
2564         };
2565         ost->ost_create_service = ptlrpc_register_service(&svc_conf,
2566                                                           obd->obd_proc_entry);
2567         if (IS_ERR(ost->ost_create_service)) {
2568                 rc = PTR_ERR(ost->ost_create_service);
2569                 CERROR("failed to start OST create service: %d\n", rc);
2570                 GOTO(out_service, rc);
2571         }
2572
2573         mask = cfs_cpt_table->ctb_nodemask;
2574         /* event CPT feature is disabled in libcfs level by set partition
2575          * number to 1, we still want to set node affinity for io service */
2576         if (cfs_cpt_number(cfs_cpt_table) == 1 && nodes_weight(*mask) > 1) {
2577                 int     cpt = 0;
2578                 int     i;
2579
2580                 ost_io_cptable = cfs_cpt_table_alloc(nodes_weight(*mask));
2581                 for_each_node_mask(i, *mask) {
2582                         if (ost_io_cptable == NULL) {
2583                                 CWARN("OSS failed to create CPT table\n");
2584                                 break;
2585                         }
2586
2587                         rc = cfs_cpt_set_node(ost_io_cptable, cpt++, i);
2588                         if (!rc) {
2589                                 CWARN("OSS Failed to set node %d for"
2590                                       "IO CPT table\n", i);
2591                                 cfs_cpt_table_free(ost_io_cptable);
2592                                 ost_io_cptable = NULL;
2593                                 break;
2594                         }
2595                 }
2596         }
2597
2598         memset(&svc_conf, 0, sizeof(svc_conf));
2599         svc_conf = (typeof(svc_conf)) {
2600                 .psc_name               = "ost_io",
2601                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
2602                 .psc_buf                = {
2603                         .bc_nbufs               = OST_NBUFS,
2604                         .bc_buf_size            = OST_BUFSIZE,
2605                         .bc_req_max_size        = OST_MAXREQSIZE,
2606                         .bc_rep_max_size        = OST_MAXREPSIZE,
2607                         .bc_req_portal          = OST_IO_PORTAL,
2608                         .bc_rep_portal          = OSC_REPLY_PORTAL,
2609                 },
2610                 .psc_thr                = {
2611                         .tc_thr_name            = "ll_ost_io",
2612                         .tc_thr_factor          = OSS_THR_FACTOR,
2613                         .tc_nthrs_init          = OSS_NTHRS_INIT,
2614                         .tc_nthrs_base          = OSS_NTHRS_BASE,
2615                         .tc_nthrs_max           = OSS_NTHRS_MAX,
2616                         .tc_nthrs_user          = oss_num_threads,
2617                         .tc_cpu_affinity        = 1,
2618                         .tc_ctx_tags            = LCT_DT_THREAD,
2619                 },
2620                 .psc_cpt                = {
2621                         .cc_cptable             = ost_io_cptable,
2622                         .cc_pattern             = ost_io_cptable == NULL ?
2623                                                   oss_io_cpts : NULL,
2624                 },
2625                 .psc_ops                = {
2626                         .so_thr_init            = ost_io_thread_init,
2627                         .so_thr_done            = ost_io_thread_done,
2628                         .so_req_handler         = ost_handle,
2629                         .so_hpreq_handler       = ost_io_hpreq_handler,
2630                         .so_req_printer         = target_print_req,
2631                 },
2632         };
2633         ost->ost_io_service = ptlrpc_register_service(&svc_conf,
2634                                                       obd->obd_proc_entry);
2635         if (IS_ERR(ost->ost_io_service)) {
2636                 rc = PTR_ERR(ost->ost_io_service);
2637                 CERROR("failed to start OST I/O service: %d\n", rc);
2638                 ost->ost_io_service = NULL;
2639                 GOTO(out_create, rc);
2640         }
2641
2642         memset(&svc_conf, 0, sizeof(svc_conf));
2643         svc_conf = (typeof(svc_conf)) {
2644                 .psc_name               = "ost_seq",
2645                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
2646                 .psc_buf                = {
2647                         .bc_nbufs               = OST_NBUFS,
2648                         .bc_buf_size            = OST_BUFSIZE,
2649                         .bc_req_max_size        = OST_MAXREQSIZE,
2650                         .bc_rep_max_size        = OST_MAXREPSIZE,
2651                         .bc_req_portal          = SEQ_DATA_PORTAL,
2652                         .bc_rep_portal          = OSC_REPLY_PORTAL,
2653                 },
2654                 .psc_thr                = {
2655                         .tc_thr_name            = "ll_ost_seq",
2656                         .tc_thr_factor          = OSS_CR_THR_FACTOR,
2657                         .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
2658                         .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
2659                         .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
2660                         .tc_nthrs_user          = oss_num_create_threads,
2661                         .tc_cpu_affinity        = 1,
2662                         .tc_ctx_tags            = LCT_DT_THREAD,
2663                 },
2664
2665                 .psc_cpt                = {
2666                         .cc_pattern          = oss_cpts,
2667                 },
2668                 .psc_ops                = {
2669                         .so_req_handler         = ost_handle,
2670                         .so_req_printer         = target_print_req,
2671                         .so_hpreq_handler       = NULL,
2672                 },
2673         };
2674         ost->ost_seq_service = ptlrpc_register_service(&svc_conf,
2675                                                       obd->obd_proc_entry);
2676         if (IS_ERR(ost->ost_seq_service)) {
2677                 rc = PTR_ERR(ost->ost_seq_service);
2678                 CERROR("failed to start OST seq service: %d\n", rc);
2679                 ost->ost_seq_service = NULL;
2680                 GOTO(out_io, rc);
2681         }
2682
2683         ping_evictor_start();
2684
2685         RETURN(0);
2686 out_io:
2687         ptlrpc_unregister_service(ost->ost_io_service);
2688         ost->ost_io_service = NULL;
2689 out_create:
2690         ptlrpc_unregister_service(ost->ost_create_service);
2691         ost->ost_create_service = NULL;
2692 out_service:
2693         ptlrpc_unregister_service(ost->ost_service);
2694         ost->ost_service = NULL;
2695 out_lprocfs:
2696         lprocfs_obd_cleanup(obd);
2697         RETURN(rc);
2698 }
2699
2700 static int ost_cleanup(struct obd_device *obd)
2701 {
2702         struct ost_obd *ost = &obd->u.ost;
2703         int err = 0;
2704         ENTRY;
2705
2706         ping_evictor_stop();
2707
2708         /* there is no recovery for OST OBD, all recovery is controlled by
2709          * obdfilter OBD */
2710         LASSERT(obd->obd_recovering == 0);
2711         mutex_lock(&ost->ost_health_mutex);
2712         ptlrpc_unregister_service(ost->ost_service);
2713         ptlrpc_unregister_service(ost->ost_create_service);
2714         ptlrpc_unregister_service(ost->ost_io_service);
2715         ptlrpc_unregister_service(ost->ost_seq_service);
2716         ost->ost_service = NULL;
2717         ost->ost_create_service = NULL;
2718         ost->ost_io_service = NULL;
2719         ost->ost_seq_service = NULL;
2720
2721         mutex_unlock(&ost->ost_health_mutex);
2722
2723         lprocfs_obd_cleanup(obd);
2724
2725         if (ost_io_cptable != NULL) {
2726                 cfs_cpt_table_free(ost_io_cptable);
2727                 ost_io_cptable = NULL;
2728         }
2729
2730         RETURN(err);
2731 }
2732
2733 static int ost_health_check(const struct lu_env *env, struct obd_device *obd)
2734 {
2735         struct ost_obd *ost = &obd->u.ost;
2736         int rc = 0;
2737
2738         mutex_lock(&ost->ost_health_mutex);
2739         rc |= ptlrpc_service_health_check(ost->ost_service);
2740         rc |= ptlrpc_service_health_check(ost->ost_create_service);
2741         rc |= ptlrpc_service_health_check(ost->ost_io_service);
2742         mutex_unlock(&ost->ost_health_mutex);
2743
2744         /*
2745          * health_check to return 0 on healthy
2746          * and 1 on unhealthy.
2747          */
2748         if( rc != 0)
2749                 rc = 1;
2750
2751         return rc;
2752 }
2753
2754 struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
2755 {
2756         return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
2757 }
2758
2759 /* use obd ops to offer management infrastructure */
2760 static struct obd_ops ost_obd_ops = {
2761         .o_owner        = THIS_MODULE,
2762         .o_setup        = ost_setup,
2763         .o_cleanup      = ost_cleanup,
2764         .o_health_check = ost_health_check,
2765 };
2766
2767
2768 static int __init ost_init(void)
2769 {
2770         struct lprocfs_static_vars lvars;
2771         int rc;
2772         ENTRY;
2773
2774         ost_page_to_corrupt = cfs_alloc_page(CFS_ALLOC_STD);
2775
2776         lprocfs_ost_init_vars(&lvars);
2777         rc = class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
2778                                  LUSTRE_OSS_NAME, NULL);
2779
2780         if (ost_num_threads != 0 && oss_num_threads == 0) {
2781                 LCONSOLE_INFO("ost_num_threads module parameter is deprecated, "
2782                               "use oss_num_threads instead or unset both for "
2783                               "dynamic thread startup\n");
2784                 oss_num_threads = ost_num_threads;
2785         }
2786
2787         RETURN(rc);
2788 }
2789
2790 static void /*__exit*/ ost_exit(void)
2791 {
2792         if (ost_page_to_corrupt)
2793                 page_cache_release(ost_page_to_corrupt);
2794
2795         class_unregister_type(LUSTRE_OSS_NAME);
2796 }
2797
2798 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2799 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
2800 MODULE_LICENSE("GPL");
2801
2802 module_init(ost_init);
2803 module_exit(ost_exit);