An Approach to Optimize the Speed of Data Intense Applications with Parallel Computing

DOI : 10.17577/IJERTCONV2IS13136

Download Full-Text PDF Cite this Publication

Text Only Version

An Approach to Optimize the Speed of Data Intense Applications with Parallel Computing

Deepika V

Dept. of Computer Science HKBK College of Engineering Bangalore, India

AbstractData is an important parameter to take any business decisions and carry out scientific research. An increasing number of data-intensive application, like data mining and web indexing need to access ever-expanding data sets ranging from a few gigabytes to several terabytes or even petabytes, Since the size of data is large must be analyzed in parallel so as to optimize the performance and reduce the decision time. Recently we have seen an increasing number of scientists employ data parallel computing frameworks such as MapReduce and Hadoop to run data intensive applications and conduct analysis. In these co-located compute and storage frameworks, a wise data placement scheme can significantly improve the performance. Existing data parallel frameworks, e.g., Hadoop, or Hadoop-based clouds, distribute the data using a random placement method for simplicity and load balance. However, we observe that many data intensive applications exhibit interest locality which only sweep part of a big dataset. The data often accessed together result from their grouping semantics. Ignoring data grouping and random data placement reduces the performance of MapReduce and Hadoop. This paper develops a new Data gRouping-AWare data placement (DRAW) framework which is Hadoop version prototype, dynamically analyzes data accesses from system log files. It extracts optimal data groupings and re-organizes data layouts to achieve the maximum parallelism per group and significantly increase the total number of local map tasks executed up to 59.8%, reduces the completion latency of the map phase up to 41.7%, and improves the overall performance by 36.4%, in comparison with Hadoops default random placement.

KeywordsData intensive; data layout; Hadoop; MapReduce;


    The data intensive applications place a demand on high- performance computing resources with bulky storage. Many scientists have been developing big data parallel computing frameworks and large-scale distributed file systems to facilitate the high-performance runs of data-intensive applications. But in practice, many scientific and engineering applications have interest locality: 1) domain scientists are only interested in a subset of the whole data set, and 2) scientists are likely to access one subset more frequently than others. For example, in the bioinformatics domain, X and Y chromosomes are related to the offsprings gender. Both chromosomes are often analyzed together in generic research rather than all the 24 human chromosomes.

    In summary, these affinitive data have high possibility to be processed as a group by specific domain applications. Here, we formally define the data grouping to represent the possibility of two or more data blocks to be accessed as a group in Hadoop. Each group can be quantified by a weight which is the number of the times that these data have already been accessed as a group. Here we assume that if two blocks of data have been already accessed together for many times, it is highly possible for them to be accessed as a group in the future [3].

    By using Hadoops default random data placement strategy, the overall data distribution may be balanced, but there is no guarantee that the data accessed as a group is evenly distributed. For Example in the figure.1 without evenly distributing the grouping data, some map tasks are either scheduled on other nodes which remotely access the needed data, or they are scheduled on these data holding nodes but have to wait in the queue. These map tasks violate the data locality and severely drag down the MapReduce program performance [3].

    We can consider 3 factors that can affect evenly distribution of data in random data placement for the same data groups as follow:

    1)The maximum number of simultaneous map tasks on each node (NS); 2) the number of replica for each data block in each rack (NR); and 3) the data grouping access patterns[1][2]. As conclusion we can tell the larger NR or NS is, the more likely that Hadoops default random solution will achieve the maximum parallelisem:

    1. assume NR is very large, e.g., we can achive the maximum parallelisem because the random data distribution will not place the same data onto the same node therefor each node will hold one copy of the data and number of replica for data is equall to number of nodes.

    2. assume NS is very large, e.g., it shows the number of similar data, even if the random solution clusters all these dataonto the same node, all the map tasks can run simultaneously hence the performance will not be degraded. In above two cases, random data distribution is the ideal data placement for affinitive data.

    Moreover, the data grouping is not considered in default Hadoop, which results in a nonoptimal data placement strategy for the data-intensive applications having interest locality.

    Therefore, we develop a new Data-gRouping-AWare data placement scheme (DRAW) that takes into account the data grouping effects to significantly improve the performance for data-intensive applications with interest locality.

    Fig. 1. Simple case showing the efficiency of data placement for MapReduce programs.

    Without loss of generality, DRAW is designed and implemented as a Hadoop-version prototype. By experimenting with real world genome indexing [2] and astrophysics applications [9], DRAW is able to execute up to 59.8% more local map tasks in comparison with random placement. In addition, DRAW reduces the completion time of map phase by up to 41.7%, and the MapReduce task execution time by up to 36.4%.


    In this section, we design DRAW at rack-level, which optimizes the grouping-data distribution inside a rack. There are three parts in our design: a data access history graph (HDAG). to exploit system log files learning the data grouping information; a data grouping matrix (DGM) to quantify the grouping weights among the data and generate the optimized data groupings; an optimal data placement algorithm (ODPA) to form the optimal data placement.

    1. History Data Access Graph(HDAG)

      HDAG is a graph describing the access patterns among the files, which can be learned from the history of data accesses. In each Hadoop cluster rack, the maintains system logs recording every system operation, including the files which have been accessed. A naive solution can be: monitor the files which are being accessed; every two continuously accessed files will be categorized in the same group. This solution is simple for implementation because it only needs a traversal of the log files. However in practical situations there are two problems: first, the log files could be huge which may result in unacceptable traversal latency; second, the continuously accessed files are not necessarily related, e.g., the last file accessed by task and the first file accessed by task Therefore,we need to define checkpoint to indicate how far the HDAG will traverse back in the logs; and we also need to exploit the mappings between tasks and files to learn the file access pattern. Note that in hadoop clusters, files are split into blocks which is the basic data distribution unit; hence we need to translate the grouping information at file level into block level.


      Fortunately, the mapping information between files and blocks can be found in the . Fig. 2 shows an example of HDAG: given three MpReduce tasks accesses

      ; here is block; accesses

      ; accesses . The

      accessing information initially generated from the log files is shown as Fig. 2(a). Thereafter we can easily translate the table into the HDAG shown as Fig. 2(b). This translation step makes it easier to generate the grouping Matrix for the next step.

    2. DGM

      Based on HDAG, we can generate a DGM showing the relation between every two data blocks. Given the same example as shown in Fig. 2, we can build the DGM as shown in Fig. 3 (step1 and step2), where each element

      is the grouping weight between data and . Every can be calculated by counting the tasks in

      common between task sets of and . The elements in the diagonal of the DGM show the number of jobs that have used this data. In DRAW, DGM is a by matrix, where is the number of existing blocks. As we stated before, one data belonging to group A may belong to group B at the same time; the grouping weight in the DGM denotes how likely one data should be grouped with another data.

      After knowing the DGM in Fig. 3, we use a matrix clustering algorithm to group the highly related data in step3. Specifically, Bond Energy Algorithm (BEA) is used to transform the DGM to the clustered data grouping matrix (CDGM). Since a weighted matrix clustering problem is N-P hard, the time complexity to obtain the optimized solution is where is the dimension. The BEA algorithm saves the computing cost by finding the suboptimal solution in time [13]; it has been widely utilized in distributed database systems for the vertical partition of large tables and matrix clustering work [13]. The BEA algorithm clusters the highly associated data together indicating which data should be evenly distributed. After group 1 is generated, repeat the steps in step 4 and step 5 to generate the group 2. In this case, group 1 and group 2 represent most related data sets. Assuming there are 5 in the Hadoop cluster, the CDGMin Fig. 3 indicates data {6, 7, 2, 1, 3} (group 1) and {4, 9, 5, 10, 8} (group 2) should be evenly distributed when placed on the 5 nodes. Note that we have only 10 pieces of data in our example, after knowing that {6, 7, 2, 1, 3} should be placed as a group (horizontally), it is natural to treat the left data {4, 9, 5, 10, 8} as another group. Hence, step 4 and step 5 in Fig. 3 are not necessary for our case, but when the no of remaining

      data is larger than no of nodes, more clustering steps are needed

      Fig.3 example showing grouping matrix and the overall flow to cluster data based on their grouping weights

    3. ODPA

      Knowing the data groups alone is not enough to achieve the random placing of each group, as shown in Fig. 4 (1), task 2 and task 3 can only run on 4 nodes rather than 5, which is not optimal.

      Algorithm 1: ODPA algorithm

      Input: The sub-matrix (OSM) as shown in Fig. 3: ; where is the number of data nodes;

      Output: matrix indicating the optimal data placement: ;


      for each row from do


      Find the minimum value in this row;

      Put this value and its corresponding column index into a set ;

      ; there may be more than

      one minimum value

      if there is only one tuple in then

      //The data referred by C1 should be placed with the data referred by R on the same node;//

      Mark column is invalid (already assigned); Continue;

      end if

      for each column from do

      Calculate ;//all the items in


      end for

      Choose the largest value from array;

      Mark column is invalid (already assigned);

      end for

      Fig. 4. Without ODPA, the layout generated from CDGM may be still non optimal.

      This is because the above data grouping only considers the horizontal relationships among the data in DGM, and so it is also necessary to make sure the blocks on the same node have minimal chance to be in the same group (vertical relationships). In order to obtain this information, we propose an algorithm named ODPA to complete our DRAW design, as described in Algorithm 1. ODPA is based on sub matrix for ODPA (OSM) from CDGM. OSM indicates the dependencies among the data already placed and the ones being placed. For example, the OSM in Fig. 3 denotes the vertical relations between two different groups (group1:6, 7, 2, 1, 3 and

      group2:4, 9, 5, 10, 8).

      Take the OSM from Fig. 3 as an example, The ODPA algorithm starts from the first row in OSM, whose row index is 6. Because there is only one minimum value 0 in column 9, we assign , which means data 6 and 9 should be placed on the same data node because 9 is the least relevant data to 6. When checking row 7, there are five equal minimum values, which means any of these five data are equally related on data 7. To choose the optimal candidate among these five candidates, we need to examine their dependencies to other already placed data, which is performed by the loop calculating the for these five columns. In our case, is the largest value; by placing 8 with 7 on the same node, we can, to the maximum extent, reduce the possibility of assigning it onto another related data block. Hence, a new tuple {7, 8} is added to . After doing the same processes to the rows with index 1, 2, 3, we have a

      , indicating the data

      should be placed as shown in Fig. 4 (2). Clearly, all the tasks can achieve the optimal parallelism (5) when running on the optimal data layout. With the help of ODPA, DRAW can achieve the two goals: maximize the parallel distribution of the grouping data, and balance the overall storage loads.

    4. Exceptions

      The cases without interest locality: DRAW is designed for the applications showing interest locality. However there are some real world applications do not have interest locality. In this case, all the data on the cluster belongs to the same group. Therefore the data grouping matrix contains the same grouping weight for each pair of data (except for the diagonal numbers); the BEA algorithm will not cluster the matrix, all the data blocks will stay on the nodes and distributed as the default random data distribution. Because all the data are equally popular, theoretically random data distribution can evenly balance them onto the nodes. In this case, DRAW has the same performance as Hadoops random data distribution strategy.

      The cases with special interest locality: The purpose of DRAW is to optimize the performance for the common applications which follow or not totally deviated from the previous interest locality. However in practice, some applications may have unpredicted access patterns that DRAW did not studied yet. These uncommon queries may suffer from bad performance because DRAW cannot guarantee these accessing data are well distributed. But this pattern will be considered into DRAWs future data organization in case it happened more times.

    5. Multireplica per rack

    In previous analysis, we assume that there is only one copy of each data existing in each rack. This assumption is derived from the practical Hadoop configurations, e.g., Hadoop with single-replica for each data [7] Hadoop with three replicas for each data but put into three different racks [8], etc. However, there are some Hadoop clusters that keep two or even three copies of the same data in the same rack [3] to provide better read performance. As we stated in Section I, the more replicas for each data in the same rack, the more optimal data distribution the random strategy can achieve (given that any two replica cannot stay in the same node). In order to prove our DRAW is still necessary for multiple replica Hadoops, we launch intensive experiments as sensitivity study in Section III-D


    In this section, we present four sets of results: the unbalanced data distribution caused by Hadoops default random data placement; comparison of the traces of the MapReduce programs on the randomly placed data, and the DRAWs reorganized data; and the sensitivity stdy used to measure the impact of the NR (number of replica for each data block in Hadoop) on DRAW. In the experiments, we developed a program performing data reorganization according to the ODPA output with a preconfigured frequency or we could launch the program manually if necessary.

    A.Test Bed

    Our test bed consists of 40 heterogeneous nodes on a single rack with Hadoop 0.20.1 installed. The cluster and node configurations are shown in the Table I. One node is

    configured as the and , whereas the

    other 39 nodes are and

    . Table I

    Class Cluster Configuration

    1. Data Distribution

      Intuitively, the data distribution may be related to the way the data is uploaded. There are two ways for the users to upload data: bulkily upload all the data at once; or upload the data based on their categories, e.g., species or particles in our cases. The second way considers the human-readable data grouping information (in our case, data belonging to the same species or particles are assumed to be highly related) rather than the blindly uploading as in the first method. We upload the data to our test bed by using these two data uploading methods, 20 times for each. The overall data distributions are similar in these runs.

      First, after bulk uploading the genome data of six species (a subset of our 40 GB genome data), the data distribution (from a randomly picked run) is shown in Fig. 5 (1). Given a research group only interested in human the requiring data is clustered as shown in Fig. 5 (2). The human data is distributed on half (51.3%) of the cluster, meaning the parallelism for the future MapReduce job is not optimal.

      When using the category-based uploading method, we surprisingly find that the overall data distribution is similar to what is shown in Fig. 5. To highlight the unbalanced distribution of the related data, we quantify the degree of unbalance with .

      With 20 runs using the species-based data uploading method, on average, the data of a specific species is distributed over only 53.2% nodes of the cluster. The conclusion shows that even when the data is uploaded based on the initial data grouping information, the Hadoops random data placement is not able to achieve the maximal parallelism for the associate data.

      Genome Indexing: Based on the DRAW [2] reorganized 40 GB genome data downloaded from [4], we run the Bowtie indexing [6] MapReduce program to index the humans chromosomes. Fig. 6 shows the traces of two runs on DRAWs reorganized data and Hadoops randomly placed data, respectively. The number of reducers is set as large as possible so that the reduce phase will not be the performance bottleneck.

      Fig. 5. Data layout after bulk uploading six species genome data, and the humans genome data layout.

    2. Performance improvement of MapReduce Programs

      In our case, we use 39 reducers. The map phase running on DRAWs data is finished 41.7% earlier than the one running on randomly placed data, and the jobs overall execution time is also improved by 36.4% when using DRAWs data. The comparison is shown in Table II. The MapReduce job running on the DRAWs reorganized data has 76.1% maps which benefit from having data locality, compared with 47.1%from the randomly placed data; the number of local map tasks is increased by .

      Note that there are still 23.9% maps which are working without having data locality even after the DRAWs data reorganization. There are two reasons: first, the data grouping information the BEA algorithm used is generated from all previous MapReduce programs rather any specific one, and the ODPA follows High-Weight-First-Placed strategy, which means the data with higher (accumulative) grouping weights will be granted higher priority to be evenly distributed. In other words, the distribution of the non hottest data is only optimized but may be not 100% perfect for the corresponding MapReduce programs. Second, the matrix clustering is an NP- hard problem, hence the clustered grouping matrix generated from BEA algorithm, whose time complexity is rather than is a pseudo-optimal solution. Adoption of BEA algorithm is a reasonable tradeoff between efficiency and accuracy. However, since the hottest data will be granted the highest priority to be clustered, the applications interested in these data can achieve the ideal parallelism.

      Mass Analyzer on Astrophysics Data: In this section, we do experiments on smaller data sets: each particles data is exactly 512 M, which will be split into only 8 blocks.

      Our Mass Analyzer on the astrophysics data tries to calculate the average mass of each area. The results are shown in Fig. 7. DRAW reduces the map phase by 18.2%, and the overall performance of the MapReduce program is improved by only 11.2%. It is obvious that the impact of DRAW is linearly related to the size of the required data by the MapReduce program. The less data is being accessed, the more close that random data placement can achieve

      maximized parallelism. For example, given 40 nodes in the cluster and 2 maximum simultaneous map tasks on each node, the 8 blocks of each astrophysics data file are more likely to be balanced when compared to the 48 blocks of an mammals genome data. Hence the conclusion is DRAW works better for the MapReduce programs accessing large-scale data (larger than 3 GB for our hardware configuration).

      Fig. 6. Running of Mass Analyzer on astrophysics data; the size of interested data for each run is relative small (8 blocks on average).

      Table II

      Comparison of Two Runs of Genome Indexing


    3. Sensitivity Study: Number of Replica (NR)

    The NR for each data block in Hadoop cluster is configurable. For data distribution, the more replica that exist for each block, the higher possibility that the grouping data can be evenly distributed. Hence, the efficiency of DRAW on the MapReduce programs is inverse proportional to NR in the Hadoop.

    In order to quantify the impact of NR on our design, we bulkily upload the 40 G genome data to our test bed configured with and respectively. Fig. 8 shows the data distributions for four species: Stickleback, Opossum, Chicken from vertebrates, and C.briggsae from nematodes. The % of nodes holding the data (NHD) is directly related to the parallelism that the program accessing corresponding species can use. The results prove that, in most cases, 3 NR is linearly related to the parallelism of data distribution; which means a higher degree of replica in Hadoop can mitigate the problem of unbalanced grouping-data distribution. For example, the Stickleback data is only distributed on 44.7% of the nodes in 1-replica Hadoop; when using 3-replica Hadoop, 81.5% of the nodes can provide Stickleback data.

    Now we study the efficiency of DRAW for multiple replica Hadoop systems. We still use the above data. Table III shows the comparison of the experimental NHD and DRAWs ideal NHD. The NHD difference indicates the possible improvement DRAW can achieve. Note that for the three vertebrates, the number of blocks for each species is larger

    than the number of nodes in our test bed, hence ideally, DRAW can distributes the grouping data on all the nodes, with 100% NHD; for the C.briggsae whose number of blocks is smaller than 40, the ideal DRAWs NHD is calculated as

    , which is shown in bold font in Table III. Our experimental results show that, for the 2-replica Hadoop, DRAW may improve the data distribution

    parallelism by 27.2% on average; for the 3-replica Hadoop, DRAW is expected to improve the parallelism by 17.6% (without considering the exception of Chicken data) on average.

    Table III


    no guarantee that the data accessed as group is randomly distributed. To overcome this problem , we are proposing the system. In this system we are giving weight to each of the groups based on the number of times the group is accessed , and then randomly placng this group, the placement performance is best and boosts the performance of analysis. We theoretically prove that the inefficiency of hadoops

    Fig. 7. Running of Mass Analyzer on astrophysics data; the size of interested data for each run is relative small (8 blocks on average).


    Several previous works exploited the grouping-like data semantics and organized data layout in some specific ways to support high-performance data accesses. Ahmed. [10] exploited the file-grouping relation to better manage the distributed file caches. The rationale is that group accessed files are likely to be group accessed again. This idea works well in cache management. However in data intensive computing, such grouping behavior is at chunk level rather than file level.

    Yuan [9] develops a data dependency-based data placement for the scientific cloud work flows, which clusters the relative data as intensively as possible to effectively reduce data movement during the workflows execution. However, for data parallel frameworks such as MapReduce which relies on the co-located compute and data locality [3], the relevant data needs to be distributed as evenly as possible.

    Xie [13] akes data locality into account for launching speculative MapReduce tasks in heterogeneous environments. They focus on balancing data processing load based on network topology and disk space utilization of a cluster. In contrast, DRAW focuses on data redistribution based on data semantics; they are two complimentary works.


In Hadoop data distribution, random placement of data ensures that overall data distribution is balanced, but there is

Fig. 8. Data distributions (NHD) of four species, on 1-replica, 2-replica, and 3-replica Hadoop.

random placement method. Our experimental results show that for two representative MapReduce applicationsGenome Indexing and Astrophysics, DRAW can significantly improve the throughput of local map task execution by up to 59.8%, and reduce the execution time of map phase by up to 41.7%. The overall MapReduce job response time is reduced by 36.4%.


[1] [Online]. Available:

  1. [Online]. Available:

  2. [Online]. Available: module1.html

  3. [Online]. Available:

  4. [Online]. Available: hdfs\_design.html

  5. [Online]. Available:

  6. [Online]. Available:

  7. [Online].Available:

  8. [Online]. Available:

  9. A. Amer,D.D. E. Long, and R. C.Burns, Group based management of distributed file caches, in Proc. 22nd Int. Conf. Distrib. Comput. Syst. (ICDCS02),Washington,DC, USA, 2002, p. 525, IEEE Computer Soc.

  10. A. Dumitriu, X and y (number 5), in Proc. ACM SIGGRAPH 2004 Art Gallery SIGGRAPH04, New York, NY, USA, 2004, p. 28, ACM.

  11. G. Ganger and M. Frans Kaashoek, Embedded inodes and explicit grouping: Exploiting disk bandwidth for small files, in Proc. 1997 USENIX Technol. Conf., 1997, pp. 117.

  12. N. Gorla and K. Zhang, Deriving program physical structures using bond energy algorithm, in Proc. 6th Asia Pacific Software Eng. Conf. APSEC99,Washington, DC, USA, 1999, p. 359, IEEE Computer Soc.

  13. G. H.Kuenning andG. J. Popek, Automated hoarding for mobile computers, in Proc. 16th ACM Symp. Operat. Syst. Principles, SOSP97,New York, 1997, pp. 264275, ACM.

  14. Y. Hahn and B. Lee, Identification of nine human-specific frameshift mutations by comparative analysis of the human and the chimpanzee genome sequences, Bioinformatics, vol. 21, pp. 186194, Jan. 2005.

  15. X. Jiong, Y. Shu, R. Xiaojun, D. Zhiyang, T. Yun, J. Majors, A. Manzanares, and Q. Xiao, Improving mapreduce performance through data placement in heterogeneous hadoop clusters, Apr. 2010.

Leave a Reply