Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
All Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

How-To Tutorials - Data

1204 Articles
article-image-entity-framework-code-first-accessing-database-views-and-stored-procedures
Packt
18 Mar 2015
15 min read
Save for later

Entity Framework Code-First: Accessing Database Views and Stored Procedures

Packt
18 Mar 2015
15 min read
In this article by Sergey Barskiy, author of the book Code-First Development using Entity Framework, you will learn how to integrate Entity Framework with additional database objects, specifically views and stored procedures. We will see how to take advantage of existing stored procedures and functions to retrieve and change the data. You will learn how to persist changed entities from our context using stored procedures. We will gain an understanding of the advantages of asynchronous processing and see how Entity Framework supports this concept via its built-in API. Finally, you will learn why concurrency is important for a multi-user application and what options are available in Entity Framework to implement optimistic concurrency. In this article, we will cover how to: Get data from a view Get data from a stored procedure or table-valued function Map create, update, and delete operations on a table to a set of stored procedures Use the asynchronous API to get and save the data Implement multi-user concurrency handling Working with views Views in an RDBMS fulfill an important role. They allow developers to combine data from multiple tables into a structure that looks like a table, but do not provide persistence. Thus, we have an abstraction on top of raw table data. One can use this approach to provide different security rights, for example. We can also simplify queries we have to write, especially if we access the data defined by views quite frequently in multiple places in our code. Entity Framework Code-First does not fully support views as of now. As a result, we have to use a workaround. One approach would be to write code as if a view was really a table, that is, let Entity Framework define this table, then drop the table, and create a replacement view. We will still end up with strongly typed data with full query support. Let's start with the same database structure we used before, including person and person type. Our view will combine a few columns from the Person table and Person type name, as shown in the following code snippet: public class PersonViewInfo { public int PersonId { get; set; } public string TypeName { get; set; } public string FirstName { get; set; } public string LastName { get; set; } } Here is the same class in VB.NET: Public Class PersonViewInfo Public Property PersonId() As Integer Public Property TypeName() As String Public Property FirstName() As String Public Property LastName() As String End Class Now, we need to create a configuration class for two reasons. We need to specify a primary key column because we do not follow the naming convention that Entity Framework assumes for primary keys. Then, we need to specify the table name, which will be our view name, as shown in the following code: public class PersonViewInfoMap : EntityTypeConfiguration<PersonViewInfo> { public PersonViewInfoMap() {    HasKey(p => p.PersonId);    ToTable("PersonView"); } } Here is the same class in VB.NET: Public Class PersonViewInfoMap Inherits EntityTypeConfiguration(Of PersonViewInfo) Public Sub New()    HasKey(Function(p) p.PersonId)    ToTable("PersonView") End Sub End Class Finally, we need to add a property to our context that exposes this data, as shown here: public DbSet<PersonViewInfo> PersonView { get; set; } The same property in VB.NET looks quite familiar to us, as shown in the following code: Property PersonView() As DbSet(Of PersonViewInfo) Now, we need to work with our initializer to drop the table and create a view in its place. We are using one of the initializers we created before. When we cover migrations, we will see that the same approach works there as well, with virtually identical code. Here is the code we added to the Seed method of our initializer, as shown in the following code: public class Initializer : DropCreateDatabaseIfModelChanges<Context> { protected override void Seed(Context context) {    context.Database.ExecuteSqlCommand("DROP TABLE PersonView");    context.Database.ExecuteSqlCommand(      @"CREATE VIEW [dbo].[PersonView]      AS      SELECT        dbo.People.PersonId,        dbo.People.FirstName,        dbo.People.LastName,        dbo.PersonTypes.TypeName      FROM        dbo.People      INNER JOIN dbo.PersonTypes        ON dbo.People.PersonTypeId = dbo.PersonTypes.PersonTypeId      "); } } In the preceding code, we first drop the table using the ExecuteSqlCommand method of the Database object. This method is useful because it allows the developer to execute arbitrary SQL code against the backend. We call this method twice, the first time to drop the tables and the second time to create our view. The same initializer code in VB.NET looks as follows: Public Class Initializer Inherits DropCreateDatabaseIfModelChanges(Of Context) Protected Overrides Sub Seed(ByVal context As Context)    context.Database.ExecuteSqlCommand("DROP TABLE PersonView")    context.Database.ExecuteSqlCommand( <![CDATA[      CREATE VIEW [dbo].[PersonView]      AS      SELECT        dbo.People.PersonId,        dbo.People.FirstName,        dbo.People.LastName,        dbo.PersonTypes.TypeName      FROM        dbo.People      INNER JOIN dbo.PersonTypes        ON dbo.People.PersonTypeId = dbo.PersonTypes.PersonTypeId]]>.Value()) End Sub End Class Since VB.NET does not support multiline strings such as C#, we are using XML literals instead, getting a value of a single node. This just makes SQL code more readable. We are now ready to query our data. This is shown in the following code snippet: using (var context = new Context()) { var people = context.PersonView    .Where(p => p.PersonId > 0)    .OrderBy(p => p.LastName)    .ToList(); foreach (var personViewInfo in people) {    Console.WriteLine(personViewInfo.LastName); } As we can see, there is literally no difference in accessing our view or any other table. Here is the same code in VB.NET: Using context = New Context() Dim people = context.PersonView _      .Where(Function(p) p.PersonId > 0) _      .OrderBy(Function(p) p.LastName) _      .ToList() For Each personViewInfo In people    Console.WriteLine(personViewInfo.LastName) Next End Using Although the view looks like a table, if we try to change and update an entity defined by this view, we will get an exception. If we do not want to play around with tables in such a way, we can still use the initializer to define our view, but query the data using a different method of the Database object, SqlQuery. This method has the same parameters as ExecuteSqlCommand, but is expected to return a result set, in our case, a collection of PersonViewInfo objects, as shown in the following code: using (var context = new Context()) { var sql = @"SELECT * FROM PERSONVIEW WHERE PERSONID > {0} "; var peopleViaCommand = context.Database.SqlQuery<PersonViewInfo>(    sql,    0); foreach (var personViewInfo in peopleViaCommand) {    Console.WriteLine(personViewInfo.LastName); } } The SqlQuery method takes generic type parameters, which define what data will be materialized when a raw SQL command is executed. The text of the command itself is simply parameterized SQL. We need to use parameters to ensure that our dynamic code is not subject to SQL injection. SQL injection is a process in which a malicious user can execute arbitrary SQL code by providing specific input values. Entity Framework is not subject to such attacks on its own. Here is the same code in VB.NET: Using context = New Context() Dim sql = "SELECT * FROM PERSONVIEW WHERE PERSONID > {0} " Dim peopleViaCommand = context.Database.SqlQuery(Of PersonViewInfo)(sql, 0)    For Each personViewInfo In peopleViaCommand    Console.WriteLine(personViewInfo.LastName) Next End Using We not only saw how to use views in Entity Framework, but saw two extremely useful methods of the Database object, which allows us to execute arbitrary SQL statements and optionally materialize the results of such queries. The generic type parameter does not have to be a class. You can use the native .NET type, such as a string or an integer. It is not always necessary to use views. Entity Framework allows us to easily combine multiple tables in a single query. Working with stored procedures The process of working with stored procedures in Entity Framework is similar to the process of working with views. We will use the same two methods we just saw on the Database object—SqlQuery and ExecuteSqlCommand. In order to read a number of rows from a stored procedure, we simply need a class that we will use to materialize all the rows of retrieved data into a collection of instances of this class. For example, to read the data from the stored procedure, consider this query: CREATE PROCEDURE [dbo].[SelectCompanies] @dateAdded as DateTime AS BEGIN SELECT CompanyId, CompanyName FROM Companies WHERE DateAdded > @dateAdded END We just need a class that matches the results of our stored procedure, as shown in the following code: public class CompanyInfo { public int CompanyId { get; set; } public string CompanyName { get; set; } } The same class looks as follows in VB.NET: Public Class CompanyInfo Property CompanyId() As Integer Property CompanyName() As String End Class We are now able to read the data using the SqlQuery method, as shown in the following code: sql = @"SelectCompanies {0}"; var companies = context.Database.SqlQuery<CompanyInfo>( sql, DateTime.Today.AddYears(-10)); foreach (var companyInfo in companies) { We specified which class we used to read the results of the query call. We also provided a formatted placeholder when we created our SQL statement for a parameter that the stored procedure takes. We provided a value for that parameter when we called SqlQuery. If one has to provide multiple parameters, one just needs to provide an array of values to SqlQuery and provide formatted placeholders, separated by commas as part of our SQL statement. We could have used a table values function instead of a stored procedure as well. Here is how the code looks in VB.NET: sql = "SelectCompanies {0}" Dim companies = context.Database.SqlQuery(Of CompanyInfo)( sql, DateTime.Today.AddYears(-10)) For Each companyInfo As CompanyInfo In companies Another use case is when our stored procedure does not return any values, but instead simply issues a command against one or more tables in the database. It does not matter as much what a procedure does, just that it does not need to return a value. For example, here is a stored procedure that updates multiple rows in a table in our database: CREATE PROCEDURE dbo.UpdateCompanies @dateAdded as DateTime, @activeFlag as Bit AS BEGIN UPDATE Companies Set DateAdded = @dateAdded, IsActive = @activeFlag END In order to call this procedure, we will use ExecuteSqlCommand. This method returns a single value—the number of rows affected by the stored procedure or any other SQL statement. You do not need to capture this value if you are not interested in it, as shown in this code snippet: var sql = @"UpdateCompanies {0}, {1}"; var rowsAffected = context.Database.ExecuteSqlCommand(    sql, DateTime.Now, true); We see that we needed to provide two parameters. We needed to provide them in the exact same order the stored procedure expected them. They are passed into ExecuteSqlCommand as the parameter array, except we did not need to create an array explicitly. Here is how the code looks in VB.NET: Dim sql = "UpdateCompanies {0}, {1}" Dim rowsAffected = context.Database.ExecuteSqlCommand( _    sql, DateTime.Now, True) Entity Framework eliminates the need for stored procedures to a large extent. However, there may still be reasons to use them. Some of the reasons include security standards, legacy database, or efficiency. For example, you may need to update thousands of rows in a single operation and retrieve them through Entity Framework; updating each row at a time and then saving those instances is not efficient. You could also update data inside any stored procedure, even if you call it with the SqlQuery method. Developers can also execute any arbitrary SQL statements, following the exact same technique as stored procedures. Just provide your SQL statement, instead of the stored procedure name to the SqlQuery or ExecuteSqlCommand method. Create, update, and delete entities with stored procedures So far, we have always used the built-in functionality that comes with Entity Framework that generates SQL statements to insert, update, or delete the entities. There are use cases when we would want to use stored procedures to achieve the same result. Developers may have requirements to use stored procedures for security reasons. You may be dealing with an existing database that has these procedures already built in. Entity Framework Code-First now has full support for such updates. We can configure the support for stored procedures using the familiar EntityTypeConfiguration class. We can do so simply by calling the MapToStoredProcedures method. Entity Framework will create stored procedures for us automatically if we let it manage database structures. We can override a stored procedure name or parameter names, if we want to, using appropriate overloads of the MapToStoredProcedures method. Let's use the Company table in our example: public class CompanyMap : EntityTypeConfiguration<Company> { public CompanyMap() {    MapToStoredProcedures(); } } If we just run the code to create or update the database, we will see new procedures created for us, named Company_Insert for an insert operation and similar names for other operations. Here is how the same class looks in VB.NET: Public Class CompanyMap Inherits EntityTypeConfiguration(Of Company) Public Sub New()    MapToStoredProcedures() End Sub End Class Here is how we can customize our procedure names if necessary: public class CompanyMap : EntityTypeConfiguration<Company> { public CompanyMap() {    MapToStoredProcedures(config =>      {        config.Delete(          procConfig =>          {            procConfig.HasName("CompanyDelete");            procConfig.Parameter(company => company.CompanyId, "companyId");          });        config.Insert(procConfig => procConfig.HasName("CompanyInsert"));        config.Update(procConfig => procConfig.HasName("CompanyUpdate"));      }); } } In this code, we performed the following: Changed the stored procedure name that deletes a company to CompanyDelete Changed the parameter name that this procedure accepts to companyId and specified that the value comes from the CompanyId property Changed the stored procedure name that performs insert operations on CompanyInsert Changed the stored procedure name that performs updates to CompanyUpdate Here is how the code looks in VB.NET: Public Class CompanyMap Inherits EntityTypeConfiguration(Of Company) Public Sub New()    MapToStoredProcedures( _      Sub(config)        config.Delete(          Sub(procConfig)            procConfig.HasName("CompanyDelete")            procConfig.Parameter(Function(company) company.CompanyId, "companyId")          End Sub        )        config.Insert(Function(procConfig) procConfig.HasName("CompanyInsert"))        config.Update(Function(procConfig) procConfig.HasName("CompanyUpdate"))      End Sub    ) End Sub End Class Of course, if you do not need to customize the names, your code will be much simpler. Summary Entity Framework provides a lot of value to the developers, allowing them to use C# or VB.NET code to manipulate database data. However, sometimes we have to drop a level lower, accessing data a bit more directly through views, dynamic SQL statements and/or stored procedures. We can use the ExecuteSqlCommand method to execute any arbitrary SQL code, including raw SQL or stored procedure. We can use the SqlQuery method to retrieve data from a view, stored procedure, or any other SQL statement, and Entity Framework takes care of materializing the data for us, based on the result type we provide. It is important to follow best practices when providing parameters to those two methods to avoid SQL injection vulnerability. Entity Framework also supports environments where there are requirements to perform all updates to entities via stored procedures. The framework will even write them for us, and we would only need to write one line of code per entity for this type of support, assuming we are happy with naming conventions and coding standards for such procedures. Resources for Article: Further resources on this subject: Developing with Entity Metadata Wrappers [article] Entity Framework DB First – Inheritance Relationships between Entities [article] The .NET Framework Primer [article]
Read more
  • 0
  • 0
  • 9124

article-image-text-mining-r-part-1
Robi Sen
16 Mar 2015
7 min read
Save for later

Text Mining with R: Part 1

Robi Sen
16 Mar 2015
7 min read
R is rapidly becoming the platform of choice for programmers, scientists, and others who need to perform statistical analysis and data mining. In part this is because R is incredibly easy to learn and with just a few commands you can perform data mining and analysis functions that would be very hard in more general purpose languages like Ruby, .Net, Java, or C++. To demonstrate R’s ease, flexibility, and power we will look at how to use R to look at a collection of tweets from the 2014 super bowl, clear up data via R, turn that data it a document matrix so we can analyze the data, then create a “word cloud” so we can visualize our analysis to look for interesting words. Getting Started To get started you need to download both R and R studio. R can be found here and RStudio can be found here. R and RStudio are available for most major operating systems and you should follow the up to date installation guides on their respective websites. For this example we are going to be using a data set from Techtunk which is rather large. For this article I have taken a small excerpt of techtrunks SuperBowl 2014, over 8 million tweets, and cleaned it up for the article. You can download it from the original data source here. Finally you will need to install the R packages text mining package (tm ) and word cloud package (wordcloud). You can use standard library method to install the packages or just use RStudio to install the packets. Preparing our Data As already stated you can find the total SuperBowl 2014 dataset. That being said, it's very large and broken up into many sets of Pipe Delimited files, and they have the .csv file extension but are not .csv, which can be somewhat awkward to work with. This though is a common problem when working with large data sets. Luckily the data set is broken up into fragments in that usually when you are working with large datasets you do not want to try to start developing against the whole data set rather a small and workable dataset the will let you quickly develop your scripts without being so large and unwieldy that it delays development. Indeed you will find that working the large files provided by Techtunk can take 10’s of minutes to process as is. In cases like this is good to look at the data, figure out what data you want, take a sample set of data, massage it as needed, and then work in it from there until you have your coding working exactly how you want. In our cases I took a subset of 4600 tweets from one of the pipe delimited files, converted the file format to Commas Separated Value, .csv, and saved it as a sample file to work from. You can do the same thing, you should consider using files smaller than 5000 records, however you would like or use the file created for this post here. Visualizing our Data For this post all we want to do is get a general sense of what the more common words are that are being tweeted during the superbowl. A common way to visualize this is with a word cloud which will show us the frequency of a term by representing it in greater size to other words in comparison to how many times it is mentioned in the body of tweets being analyzed. To do this we need to a few things first with our data. First we need to create read in our file and turn our collection of tweets into a Corpus. In general a Corpus is a large body of text documents. In R’s textming package it’s an object that will be used to hold our tweets in memory. So to load our tweets as a corpus into R you can do as shown here: # change this file location to suit your machine file_loc <- "yourfilelocation/largerset11.csv" # change TRUE to FALSE if you have no column headings in the CSV myfile <- read.csv(file_loc, header = TRUE, stringsAsFactors=FALSE) require(tm) mycorpus <- Corpus(DataframeSource(myfile[c("username","tweet")])) You can now simply print your Corpus to get a sense of it. > print(mycorpus) <<VCorpus (documents: 4688, metadata (corpus/indexed): 0/0)>> In this case, VCorpus is an automatic assignment meaning that the Corpus is a Volatile object stored in memory. If you want you can make the Corpus permanent using PCorpus. You might do this if you were doing analysis on actual documents such as PDF’s or even databases and in this case R makes pointers to the documents instead of full document structures in memory. Another method you can use to look at your corpus is inspect() which provides a variety of ways to look at the documents in your corpus. For example using: inspect(mycorpus[1,2]) This might give you a result like: > inspect(mycorpus[1:2]) <<VCorpus (documents: 2, metadata (corpus/indexed): 0/0)>> [[1]] <<PlainTextDocument (metadata: 7)>> sstanley84 wow rt thestanchion news fleury just made fraudulent investment karlsson httptco5oi6iwashg [[2]] <<PlainTextDocument (metadata: 7)>> judemgreen 2 hour wait train superbowl time traffic problemsnice job chris christie As such inspect can be very useful in quickly getting a sense of data in your corpus without having to try to print the whole corpus. Now that we have our corpus in memory let's clean it up a little before we do our analysis. Usually you want to do this on documents that you are analyzing to remove words that are not relevant to your analysis such as “stopwords” or words such as and, like, but, if, the, and the like which you don’t care about. To do this with the textmining package you want to use transforms. Transforms essentially various functions to all documents in a corpus and that the form of tm_map(your corpus, some function). For example we can use tm_map like this: mycorpus <- tm_map(mycorpus, removePunctuation) Which will now remove all the punctuation marks from our tweets. We can do some other transforms to clean up our data by converting all the text to lower case, removing stop words, extra whitespace, and the like. mycorpus <- tm_map(mycorpus, removePunctuation) mycorpus <- tm_map(mycorpus, content_transformer(tolower)) mycorpus <- tm_map(mycorpus, stripWhitespace) mycorpus <- tm_map(mycorpus, removeWords, c(stopwords("english"), "news")) Note the last line. In that case we are using the stopwords() method but also adding our own word to it; news. You could append your own list of stopwords in this manner. Summary In this post we have looked at the basics of doing text mining in R by selecting data, preparing it, cleaning, then performing various operations on it to visualize that data. In the next post, Part 2, we look at a simple use case showing how we can derive real meaning and value from a visualization by seeing how a simple word cloud and help you understand the impact of an advertisement. About the author Robi Sen, CSO at Department 13, is an experienced inventor, serial entrepreneur, and futurist whose dynamic twenty-plus year career in technology, engineering, and research has led him to work on cutting edge projects for DARPA, TSWG, SOCOM, RRTO, NASA, DOE, and the DOD. Robi also has extensive experience in the commercial space, including the co-creation of several successful start-up companies. He has worked with companies such as UnderArmour, Sony, CISCO, IBM, and many others to help build out new products and services. Robi specializes in bringing his unique vision and thought process to difficult and complex problems allowing companies and organizations to find innovative solutions that they can rapidly operationalize or go to market with.
Read more
  • 0
  • 0
  • 2676

article-image-pricing-double-no-touch-option
Packt
10 Mar 2015
19 min read
Save for later

Pricing the Double-no-touch option

Packt
10 Mar 2015
19 min read
In this article by Balázs Márkus, coauthor of the book Mastering R for Quantitative Finance, you will learn about pricing and life of Double-no-touch (DNT) option. (For more resources related to this topic, see here.) A Double-no-touch (DNT) option is a binary option that pays a fixed amount of cash at expiry. Unfortunately, the fExoticOptions package does not contain a formula for this option at present. We will show two different ways to price DNTs that incorporate two different pricing approaches. In this section, we will call the function dnt1, and for the second approach, we will use dnt2 as the name for the function. Hui (1996) showed how a one-touch double barrier binary option can be priced. In his terminology, "one-touch" means that a single trade is enough to trigger the knock-out event, and "double barrier" binary means that there are two barriers and this is a binary option. We call this DNT as it is commonly used on the FX markets. This is a good example for the fact that many popular exotic options are running under more than one name. In Haug (2007a), the Hui-formula is already translated into the generalized framework. S, r, b, s, and T have the same meaning. K means the payout (dollar amount) while L and U are the lower and upper barriers. Where Implementing the Hui (1996) function to R starts with a big question mark: what should we do with an infinite sum? How high a number should we substitute as infinity? Interestingly, for practical purposes, small number like 5 or 10 could often play the role of infinity rather well. Hui (1996) states that convergence is fast most of the time. We are a bit skeptical about this since a will be used as an exponent. If b is negative and sigma is small enough, the (S/L)a part in the formula could turn out to be a problem. First, we will try with normal parameters and see how quick the convergence is: dnt1 <- function(S, K, U, L, sigma, T, r, b, N = 20, ploterror = FALSE){    if ( L > S | S > U) return(0)    Z <- log(U/L)    alpha <- -1/2*(2*b/sigma^2 - 1)    beta <- -1/4*(2*b/sigma^2 - 1)^2 - 2*r/sigma^2    v <- rep(0, N)    for (i in 1:N)        v[i] <- 2*pi*i*K/(Z^2) * (((S/L)^alpha - (-1)^i*(S/U)^alpha ) /            (alpha^2+(i*pi/Z)^2)) * sin(i*pi/Z*log(S/L)) *              exp(-1/2 * ((i*pi/Z)^2-beta) * sigma^2*T)    if (ploterror) barplot(v, main = "Formula Error");    sum(v) } print(dnt1(100, 10, 120, 80, 0.1, 0.25, 0.05, 0.03, 20, TRUE)) The following screenshot shows the result of the preceding code: The Formula Error chart shows that after the seventh step, additional steps were not influencing the result. This means that for practical purposes, the infinite sum can be quickly estimated by calculating only the first seven steps. This looks like a very quick convergence indeed. However, this could be pure luck or coincidence. What about decreasing the volatility down to 3 percent? We have to set N as 50 to see the convergence: print(dnt1(100, 10, 120, 80, 0.03, 0.25, 0.05, 0.03, 50, TRUE)) The preceding command gives the following output: Not so impressive? 50 steps are still not that bad. What about decreasing the volatility even lower? At 1 percent, the formula with these parameters simply blows up. First, this looks catastrophic; however, the price of a DNT was already 98.75 percent of the payout when we used 3 percent volatility. Logic says that the DNT price should be a monotone-decreasing function of volatility, so we already know that the price of the DNT should be worth at least 98.75 percent if volatility is below 3 percent. Another issue is that if we choose an extreme high U or extreme low L, calculation errors emerge. However, similar to the problem with volatility, common sense helps here too; the price of a DNT should increase if we make U higher or L lower. There is still another trick. Since all the problem comes from the a parameter, we can try setting b as 0, which will make a equal to 0.5. If we also set r to 0, the price of a DNT converges into 100 percent as the volatility drops. Anyway, whenever we substitute an infinite sum by a finite sum, it is always good to know when it will work and when it will not. We made a new code that takes into consideration that convergence is not always quick. The trick is that the function calculates the next step as long as the last step made any significant change. This is still not good for all the parameters as there is no cure for very low volatility, except that we accept the fact that if implied volatilities are below 1 percent, than this is an extreme market situation in which case DNT options should not be priced by this formula: dnt1 <- function(S, K, U, L, sigma, Time, r, b) { if ( L > S | S > U) return(0) Z <- log(U/L) alpha <- -1/2*(2*b/sigma^2 - 1) beta <- -1/4*(2*b/sigma^2 - 1)^2 - 2*r/sigma^2 p <- 0 i <- a <- 1 while (abs(a) > 0.0001){    a <- 2*pi*i*K/(Z^2) * (((S/L)^alpha - (-1)^i*(S/U)^alpha ) /      (alpha^2 + (i *pi / Z)^2) ) * sin(i * pi / Z * log(S/L)) *        exp(-1/2*((i*pi/Z)^2-beta) * sigma^2 * Time)    p <- p + a    i <- i + 1 } p } Now that we have a nice formula, it is possible to draw some DNT-related charts to get more familiar with this option. Later, we will use a particular AUDUSD DNT option with the following parameters: L equal to 0.9200, U equal to 0.9600, K (payout) equal to USD 1 million, T equal to 0.25 years, volatility equal to 6 percent, r_AUD equal to 2.75 percent, r_USD equal to 0.25 percent, and b equal to -2.5 percent. We will calculate and plot all the possible values of this DNT from 0.9200 to 0.9600; each step will be one pip (0.0001), so we will use 2,000 steps. The following code plots a graph of price of underlying: x <- seq(0.92, 0.96, length = 2000) y <- z <- rep(0, 2000) for (i in 1:2000){    y[i] <- dnt1(x[i], 1e6, 0.96, 0.92, 0.06, 0.25, 0.0025, -0.0250)    z[i] <- dnt1(x[i], 1e6, 0.96, 0.92, 0.065, 0.25, 0.0025, -0.0250) } matplot(x, cbind(y,z), type = "l", lwd = 2, lty = 1,    main = "Price of a DNT with volatility 6% and 6.5% ", cex.main = 0.8, xlab = "Price of underlying" ) The following output is the result of the preceding code: It can be clearly seen that even a small change in volatility can have a huge impact on the price of a DNT. Looking at this chart is an intuitive way to find that vega must be negative. Interestingly enough even just taking a quick look at this chart can convince us that the absolute value of vega is decreasing if we are getting closer to the barriers. Most end users think that the biggest risk is when the spot is getting close to the trigger. This is because end users really think about binary options in a binary way. As long as the DNT is alive, they focus on the positive outcome. However, for a dynamic hedger, the risk of a DNT is not that interesting when the value of the DNT is already small. It is also very interesting that since the T-Bill price is independent of the volatility and since the DNT + DOT = T-Bill equation holds, an increasing volatility will decrease the price of the DNT by the exact same amount just like it will increase the price of the DOT. It is not surprising that the vega of the DOT should be the exact mirror of the DNT. We can use the GetGreeks function to estimate vega, gamma, delta, and theta. For gamma we can use the GetGreeks function in the following way: GetGreeks <- function(FUN, arg, epsilon,...) {    all_args1 <- all_args2 <- list(...)    all_args1[[arg]] <- as.numeric(all_args1[[arg]] + epsilon)    all_args2[[arg]] <- as.numeric(all_args2[[arg]] - epsilon)    (do.call(FUN, all_args1) -        do.call(FUN, all_args2)) / (2 * epsilon) } Gamma <- function(FUN, epsilon, S, ...) {    arg1 <- list(S, ...)    arg2 <- list(S + 2 * epsilon, ...)    arg3 <- list(S - 2 * epsilon, ...)    y1 <- (do.call(FUN, arg2) - do.call(FUN, arg1)) / (2 * epsilon)    y2 <- (do.call(FUN, arg1) - do.call(FUN, arg3)) / (2 * epsilon)  (y1 - y2) / (2 * epsilon) } x = seq(0.9202, 0.9598, length = 200) delta <- vega <- theta <- gamma <- rep(0, 200)   for(i in 1:200){ delta[i] <- GetGreeks(FUN = dnt1, arg = 1, epsilon = 0.0001,    x[i], 1000000, 0.96, 0.92, 0.06, 0.5, 0.02, -0.02) vega[i] <-   GetGreeks(FUN = dnt1, arg = 5, epsilon = 0.0005,    x[i], 1000000, 0.96, 0.92, 0.06, 0.5, 0.0025, -0.025) theta[i] <- - GetGreeks(FUN = dnt1, arg = 6, epsilon = 1/365,    x[i], 1000000, 0.96, 0.92, 0.06, 0.5, 0.0025, -0.025) gamma[i] <- Gamma(FUN = dnt1, epsilon = 0.0001, S = x[i], K =    1e6, U = 0.96, L = 0.92, sigma = 0.06, Time = 0.5, r = 0.02, b = -0.02) }   windows() plot(x, vega, type = "l", xlab = "S",ylab = "", main = "Vega") The following chart is the result of the preceding code: After having a look at the value chart, the delta of a DNT is also very close to intuitions; if we are coming close to the higher barrier, our delta gets negative, and if we are coming closer to the lower barrier, the delta gets positive as follows: windows() plot(x, delta, type = "l", xlab = "S",ylab = "", main = "Delta") This is really a non-convex situation; if we would like to do a dynamic delta hedge, we will lose money for sure. If the spot price goes up, the delta of the DNT decreases, so we should buy some AUDUSD as a hedge. However, if the spot price goes down, we should sell some AUDUSD. Imagine a scenario where AUDUSD goes up 20 pips in the morning and then goes down 20 pips in the afternoon. For a dynamic hedger, this means buying some AUDUSD after the price moved up and selling this very same amount after the price comes down. The changing of the delta can be described by the gamma as follows: windows() plot(x, gamma, type = "l", xlab = "S",ylab = "", main = "Gamma") Negative gamma means that if the spot goes up, our delta is decreasing, but if the spot goes down, our delta is increasing. This doesn't sound great. For this inconvenient non-convex situation, there is some compensation, that is, the value of theta is positive. If nothing happens, but one day passes, the DNT will automatically worth more. Here, we use theta as minus 1 times the partial derivative, since if (T-t) is the time left, we check how the value changes as t increases by one day: windows() plot(x, theta, type = "l", xlab = "S",ylab = "", main = "Theta") The more negative the gamma, the more positive our theta. This is how time compensates for the potential losses generated by the negative gamma. Risk-neutral pricing also implicates that negative gamma should be compensated by a positive theta. This is the main message of the Black-Scholes framework for vanilla options, but this is also true for exotics. See Taleb (1997) and Wilmott (2006). We already introduced the Black-Scholes surface before; now, we can go into more detail. This surface is also a nice interpretation of how theta and delta work. It shows the price of an option for different spot prices and times to maturity, so the slope of this surface is the theta for one direction and delta for the other. The code for this is as follows: BS_surf <- function(S, Time, FUN, ...) { n <- length(S) k <- length(Time) m <- matrix(0, n, k) for (i in 1:n) {    for (j in 1:k) {      l <- list(S = S[i], Time = Time[j], ...)      m[i,j] <- do.call(FUN, l)      } } persp3D(z = m, xlab = "underlying", ylab = "Time",    zlab = "option price", phi = 30, theta = 30, bty = "b2") } BS_surf(seq(0.92,0.96,length = 200), seq(1/365, 1/48, length = 200), dnt1, K = 1000000, U = 0.96, L = 0.92, r = 0.0025, b = -0.0250,    sigma = 0.2) The preceding code gives the following output: We can see what was already suspected; DNT likes when time is passing and the spot is moving to the middle of the (L,U) interval. Another way to price the Double-no-touch option Static replication is always the most elegant way of pricing. The no-arbitrage argument will let us say that if, at some time in the future, two portfolios have the same value for sure, then their price should be equal any time before this. We will show how double-knock-out (DKO) options could be used to build a DNT. We will need to use a trick; the strike price could be the same as one of the barriers. For a DKO call, the strike price should be lower than the upper barrier because if the strike price is not lower than the upper barrier, the DKO call would be knocked out before it could become in-the-money, so in this case, the option would be worthless as nobody can ever exercise it in-the-money. However, we can choose the strike price to be equal to the lower barrier. For a put, the strike price should be higher than the lower barrier, so why not make it equal to the upper barrier. This way, the DKO call and DKO put option will have a very convenient feature; if they are still alive, they will both expiry in-the-money. Now, we are almost done. We just have to add the DKO prices, and we will get a DNT that has a payout of (U-L) dollars. Since DNT prices are linear in the payout, we only have to multiply the result by K*(U-L): dnt2 <- function(S, K, U, L, sigma, T, r, b) {      a <- DoubleBarrierOption("co", S, L, L, U, T, r, b, sigma, 0,        0,title = NULL, description = NULL)    z <- a@price    b <- DoubleBarrierOption("po", S, U, L, U, T, r, b, sigma, 0,        0,title = NULL, description = NULL)    y <- b@price    (z + y) / (U - L) * K } Now, we have two functions for which we can compare the results: dnt1(0.9266, 1000000, 0.9600, 0.9200, 0.06, 0.25, 0.0025, -0.025) [1] 48564.59   dnt2(0.9266, 1000000, 0.9600, 0.9200, 0.06, 0.25, 0.0025, -0.025) [1] 48564.45 For a DNT with a USD 1 million contingent payout and an initial market value of over 48,000 dollars, it is very nice to see that the difference in the prices is only 14 cents. Technically, however, having a second pricing function is not a big help since low volatility is also an issue for dnt2. We will use dnt1 for the rest of the article. The life of a Double-no-touch option – a simulation How has the DNT price been evolving during the second quarter of 2014? We have the open-high-low-close type time series with five minute frequency for AUDUSD, so we know all the extreme prices: d <- read.table("audusd.csv", colClasses = c("character", rep("numeric",5)), sep = ";", header = TRUE) underlying <- as.vector(t(d[, 2:5])) t <- rep( d[,6], each = 4) n <- length(t) option_price <- rep(0, n)   for (i in 1:n) { option_price[i] <- dnt1(S = underlying[i], K = 1000000,    U = 0.9600, L = 0.9200, sigma = 0.06, T = t[i]/(60*24*365),      r = 0.0025, b = -0.0250) } a <- min(option_price) b <- max(option_price) option_price_transformed = (option_price - a) * 0.03 / (b - a) + 0.92   par(mar = c(6, 3, 3, 5)) matplot(cbind(underlying,option_price_transformed), type = "l",    lty = 1, col = c("grey", "red"),    main = "Price of underlying and DNT",    xaxt = "n", yaxt = "n", ylim = c(0.91,0.97),    ylab = "", xlab = "Remaining time") abline(h = c(0.92, 0.96), col = "green") axis(side = 2, at = pretty(option_price_transformed),    col.axis = "grey", col = "grey") axis(side = 4, at = pretty(option_price_transformed),    labels = round(seq(a/1000,1000,length = 7)), las = 2,    col = "red", col.axis = "red") axis(side = 1, at = seq(1,n, length=6),    labels = round(t[round(seq(1,n, length=6))]/60/24)) The following is the output for the preceding code: The price of a DNT is shown in red on the right axis (divided by 1000), and the actual AUDUSD price is shown in grey on the left axis. The green lines are the barriers of 0.9200 and 0.9600. The chart shows that in 2014 Q2, the AUDUSD currency pair was traded inside the (0.9200; 0.9600) interval; thus, the payout of the DNT would have been USD 1 million. This DNT looks like a very good investment; however, reality is just one trajectory out of an a priori almost infinite set. It could have happened differently. For example, on May 02, 2014, there were still 59 days left until expiry, and AUDUSD was traded at 0.9203, just three pips away from the lower barrier. At this point, the price of this DNT was only USD 5,302 dollars which is shown in the following code: dnt1(0.9203, 1000000, 0.9600, 0.9200, 0.06, 59/365, 0.0025, -0.025) [1] 5302.213 Compare this USD 5,302 to the initial USD 48,564 option price! In the following simulation, we will show some different trajectories. All of them start from the same 0.9266 AUDUSD spot price as it was on the dawn of April 01, and we will see how many of them stayed inside the (0.9200; 0.9600) interval. To make it simple, we will simulate geometric Brown motions by using the same 6 percent volatility as we used to price the DNT: library(matrixStats) DNT_sim <- function(S0 = 0.9266, mu = 0, sigma = 0.06, U = 0.96, L = 0.92, N = 5) {    dt <- 5 / (365 * 24 * 60)    t <- seq(0, 0.25, by = dt)    Time <- length(t)      W <- matrix(rnorm((Time - 1) * N), Time - 1, N)    W <- apply(W, 2, cumsum)    W <- sqrt(dt) * rbind(rep(0, N), W)    S <- S0 * exp((mu - sigma^2 / 2) * t + sigma * W )    option_price <- matrix(0, Time, N)      for (i in 1:N)        for (j in 1:Time)          option_price[j,i] <- dnt1(S[j,i], K = 1000000, U, L, sigma,              0.25-t[j], r = 0.0025,                b = -0.0250)*(min(S[1:j,i]) > L & max(S[1:j,i]) < U)      survivals <- sum(option_price[Time,] > 0)    dev.new(width = 19, height = 10)      par(mfrow = c(1,2))    matplot(t,S, type = "l", main = "Underlying price",        xlab = paste("Survived", survivals, "from", N), ylab = "")    abline( h = c(U,L), col = "blue")    matplot(t, option_price, type = "l", main = "DNT price",        xlab = "", ylab = "")} set.seed(214) system.time(DNT_sim()) The following is the output for the preceding code: Here, the only surviving trajectory is the red one; in all other cases, the DNT hits either the higher or the lower barrier. The line set.seed(214) grants that this simulation will look the same anytime we run this. One out of five is still not that bad; it would suggest that for an end user or gambler who does no dynamic hedging, this option has an approximate value of 20 percent of the payout (especially since the interest rates are low, the time value of money is not important). However, five trajectories are still too few to jump to such conclusions. We should check the DNT survivorship ratio for a much higher number of trajectories. The ratio of the surviving trajectories could be a good estimator of the a priori real-world survivorship probability of this DNT; thus, the end user value of it. Before increasing N rapidly, we should keep in mind how much time this simulation took. For my computer, it took 50.75 seconds for N = 5, and 153.11 seconds for N = 15. The following is the output for N = 15: Now, 3 out of 15 survived, so the estimated survivorship ratio is still 3/15, which is equal to 20 percent. Looks like this is a very nice product; the price is around 5 percent of the payout, while 20 percent is the estimated survivorship ratio. Just out of curiosity, run the simulation for N equal to 200. This should take about 30 minutes. The following is the output for N = 200: The results are shocking; now, only 12 out of 200 survive, and the ratio is only 6 percent! So to get a better picture, we should run the simulation for a larger N. The movie Whatever Works by Woody Allen (starring Larry David) is 92 minutes long; in simulation time, that is N = 541. For this N = 541, there are only 38 surviving trajectories, resulting in a survivorship ratio of 7 percent. What is the real expected survivorship ratio? Is it 20 percent, 6 percent, or 7 percent? We simply don't know at this point. Mathematicians warn us that the law of large numbers requires large numbers, where large is much more than 541, so it would be advisable to run this simulation for as large an N as time allows. Of course, getting a better computer also helps to do more N during the same time. Anyway, from this point of view, Hui's (1996) relatively fast converging DNT pricing formula gets some respect. Summary We started this article by introducing exotic options. In a brief theoretical summary, we explained how exotics are linked together. There are many types of exotics. We showed one possible way of classification that is consistent with the fExoticOptions package. We showed how the Black-Scholes surface (a 3D chart that contains the price of a derivative dependent on time and the underlying price) can be constructed for any pricing function. Resources for Article: Further resources on this subject: What is Quantitative Finance? [article] Learning Option Pricing [article] Derivatives Pricing [article]
Read more
  • 0
  • 0
  • 5727
Visually different images

article-image-learning-random-forest-using-mahout
Packt
05 Mar 2015
11 min read
Save for later

Learning Random Forest Using Mahout

Packt
05 Mar 2015
11 min read
In this article by Ashish Gupta, author of the book Learning Apache Mahout Classification, we will learn about Random forest, which is one of the most popular techniques in classification. It starts with a machine learning technique called decision tree. In this article, we will explore the following topics: Decision tree Random forest Using Mahout for Random forest (For more resources related to this topic, see here.) Decision tree A decision tree is used for classification and regression problems. In simple terms, it is a predictive model that uses binary rules to calculate the target variable. In a decision tree, we use an iterative process of splitting the data into partitions, then we split it further on branches. As in other classification model creation processes, we start with the training dataset in which target variables or class labels are defined. The algorithm tries to break all the records in training datasets into two parts based on one of the explanatory variables. The partitioning is then applied to each new partition, and this process is continued until no more partitioning can be done. The core of the algorithm is to find out the rule that determines the initial split. There are algorithms to create decision trees, such as Iterative Dichotomiser 3 (ID3), Classification and Regression Tree (CART), Chi-squared Automatic Interaction Detector (CHAID), and so on. A good explanation for ID3 can be found at http://www.cse.unsw.edu.au/~billw/cs9414/notes/ml/06prop/id3/id3.html. Forming the explanatory variables to choose the best splitter in a node, the algorithm considers each variable in turn. Every possible split is considered and tried, and the best split is the one that produces the largest decrease in diversity of the classification label within each partition. This is repeated for all variables, and the winner is chosen as the best splitter for that node. The process is continued in the next node until we reach a node where we can make the decision. We create a decision tree from a training dataset so it can suffer from the overfitting problem. This behavior creates a problem with real datasets. To improve this situation, a process called pruning is used. In this process, we remove the branches and leaves of the tree to improve the performance. Algorithms used to build the tree work best at the starting or root node since all the information is available there. Later on, with each split, data is less and towards the end of the tree, a particular node can show patterns that are related to the set of data which is used to split. These patterns create problems when we use them to predict the real dataset. Pruning methods let the tree grow and remove the smaller branches that fail to generalize. Now take an example to understand the decision tree. Consider we have a iris flower dataset. This dataset is hugely popular in the machine learning field. It was introduced by Sir Ronald Fisher. It contains 50 samples from each of three species of iris flower (Iris setosa, Iris virginica, and Iris versicolor). The four explanatory variables are the length and width of the sepals and petals in centimeters, and the target variable is the class to which the flower belongs. As you can see in the preceding diagram, all the groups were earlier considered as Sentosa species and then the explanatory variable and petal length were further used to divide the groups. At each step, the calculation for misclassified items was also done, which shows how many items were wrongly classified. Moreover, the petal width variable was taken into account. Usually, items at leaf nodes are correctly classified. Random forest The Random forest algorithm was developed by Leo Breiman and Adele Cutler. Random forests grow many classification trees. They are an ensemble learning method for classification and regression that constructs a number of decision trees at training time and also outputs the class that is the mode of the classes outputted by individual trees. Single decision trees show the bias–variance tradeoff. So they usually have high variance or high bias. The following are the parameters in the algorithm: Bias: This is an error caused by an erroneous assumption in the learning algorithm Variance: This is an error that ranges from sensitivity to small fluctuations in the training set Random forests attempt to mitigate this problem by averaging to find a natural balance between two extremes. A Random forest works on the idea of bagging, which is to average noisy and unbiased models to create a model with low variance. A Random forest algorithm works as a large collection of decorrelated decision trees. To understand the idea of a Random forest algorithm, let's work with an example. Consider we have a training dataset that has lots of features (explanatory variables) and target variables or classes: We create a sample set from the given dataset: A different set of random features were taken into account to create the random sub-dataset. Now, from these sub-datasets, different decision trees will be created. So actually we have created a forest of the different decision trees. Using these different trees, we will create a ranking system for all the classifiers. To predict the class of a new unknown item, we will use all the decision trees and separately find out which class these trees are predicting. See the following diagram for a better understanding of this concept: Different decision trees to predict the class of an unknown item In this particular case, we have four different decision trees. We predict the class of an unknown dataset with each of the trees. As per the preceding figure, the first decision tree provides class 2 as the predicted class, the second decision tree predicts class 5, the third decision tree predicts class 5, and the fourth decision tree predicts class 3. Now, a Random forest will vote for each class. So we have one vote each for class 2 and class 3 and two votes for class 5. Therefore, it has decided that for the new unknown dataset, the predicted class is class 5. So the class that gets a higher vote is decided for the new dataset. A Random forest has a lot of benefits in classification and a few of them are mentioned in the following list: Combination of learning models increases the accuracy of the classification Runs effectively on large datasets as well The generated forest can be saved and used for other datasets as well Can handle a large amount of explanatory variables Now that we have understood the Random forest theoretically, let's move on to Mahout and use the Random forest algorithm, which is available in Apache Mahout. Using Mahout for Random forest Mahout has implementation for the Random forest algorithm. It is very easy to understand and use. So let's get started. Dataset We will use the NSL-KDD dataset. Since 1999, KDD'99 has been the most widely used dataset for the evaluation of anomaly detection methods. This dataset is prepared by S. J. Stolfo and is built based on the data captured in the DARPA'98 IDS evaluation program (R. P. Lippmann, D. J. Fried, I. Graf, J. W. Haines, K. R. Kendall, D. McClung, D. Weber, S. E. Webster, D. Wyschogrod, R. K. Cunningham, and M. A. Zissman, "Evaluating intrusion detection systems: The 1998 darpa off-line intrusion detection evaluation," discex, vol. 02, p. 1012, 2000). DARPA'98 is about 4 GB of compressed raw (binary) tcp dump data of 7 weeks of network traffic, which can be processed into about 5 million connection records, each with about 100 bytes. The two weeks of test data have around 2 million connection records. The KDD training dataset consists of approximately 4,900,000 single connection vectors, each of which contains 41 features and is labeled as either normal or an attack, with exactly one specific attack type. NSL-KDD is a dataset suggested to solve some of the inherent problems of the KDD'99 dataset. You can download this dataset from http://nsl.cs.unb.ca/NSL-KDD/. We will download the KDDTrain+_20Percent.ARFF and KDDTest+.ARFF datasets. In KDDTrain+_20Percent.ARFF and KDDTest+.ARFF, remove the first 44 lines (that is, all lines starting with @attribute). If this is not done, we will not be able to generate a descriptor file. Steps to use the Random forest algorithm in Mahout The steps to implement the Random forest algorithm in Apache Mahout are as follows: Transfer the test and training datasets to hdfs using the following commands: hadoop fs -mkdir /user/hue/KDDTrainhadoop fs -mkdir /user/hue/KDDTesthadoop fs –put /tmp/KDDTrain+_20Percent.arff /user/hue/KDDTrainhadoop fs –put /tmp/KDDTest+.arff /user/hue/KDDTest Generate the descriptor file. Before you build a Random forest model based on the training data in KDDTrain+.arff, a descriptor file is required. This is because all information in the training dataset needs to be labeled. From the labeled dataset, the algorithm can understand which one is numerical and categorical. Use the following command to generate descriptor file: hadoop jar $MAHOUT_HOME/core/target/mahout-core-xyz.job.jarorg.apache.mahout.classifier.df.tools.Describe-p /user/hue/KDDTrain/KDDTrain+_20Percent.arff-f /user/hue/KDDTrain/KDDTrain+.info-d N 3 C 2 N C 4 N C 8 N 2 C 19 N L Jar: Mahout core jar (xyz stands for version). If you have directly installed Mahout, it can be found under the /usr/lib/mahout folder. The main class Describe is used here and it takes three parameters: The p path for the data to be described. The f location for the generated descriptor file. d is the information for the attribute on the data. N 3 C 2 N C 4 N C 8 N 2 C 19 N L defines that the dataset is starting with a numeric (N), followed by three categorical attributes, and so on. In the last, L defines the label. The output of the previous command is shown in the following screenshot: Build the Random forest using the following command: hadoop jar $MAHOUT_HOME/examples/target/mahout-examples-xyz-job.jar org.apache.mahout.classifier.df.mapreduce.BuildForest-Dmapred.max.split.size=1874231 -d /user/hue/KDDTrain/KDDTrain+_20Percent.arff-ds /user/hue/KDDTrain/KDDTrain+.info-sl 5 -p -t 100 –o /user/hue/ nsl-forest Jar: Mahout example jar (xyz stands for version). If you have directly installed Mahout, it can be found under the /usr/lib/mahout folder. The main class build forest is used to build the forest with other arguments, which are shown in the following list: Dmapred.max.split.size indicates to Hadoop the maximum size of each partition. d stands for the data path. ds stands for the location of the descriptor file. sl is a variable to select randomly at each tree node. Here, each tree is built using five randomly selected attributes per node. p uses partial data implementation. t stands for the number of trees to grow. Here, the commands build 100 trees using partial implementation. o stands for the output path that will contain the decision forest. In the end, the process will show the following result: Use this model to classify the new dataset: hadoop jar $MAHOUT_HOME/examples/target/mahout-examples-xyz-job.jar org.apache.mahout.classifier.df.mapreduce.TestForest-i /user/hue/KDDTest/KDDTest+.arff-ds /user/hue/KDDTrain/KDDTrain+.info -m /user/hue/nsl-forest -a –mr-o /user/hue/predictions Jar: Mahout example jar (xyz stands for version). If you have directly installed Mahout, it can be found under the /usr/lib/mahout folder. The class to test the forest has the following parameters: I indicates the path for the test data ds stands for the location of the descriptor file m stands for the location of the generated forest from the previous command a informs to run the analyzer to compute the confusion matrix mr informs Hadoop to distribute the classification o stands for the location to store the predictions in The job provides the following confusion matrix: So, from the confusion matrix, it is clear that 9,396 instances were correctly classified and 315 normal instances were incorrectly classified as anomalies. And the accuracy percentage is 77.7635 (correctly classified instances by the model / classified instances). The output file in the prediction folder contains the list where 0 and 1. 0 defines the normal dataset and 1 defines the anomaly. Summary In this article, we discussed the Random forest algorithm. We started our discussion by understanding the decision tree and continued with an understanding of the Random forest. We took up the NSL-KDD dataset, which is used to build predictive systems for cyber security. We used Mahout to build the Random forest tree, and used it with the test dataset and generated the confusion matrix and other statistics for the output. Resources for Article: Further resources on this subject: Implementing the Naïve Bayes classifier in Mahout [article] About Cassandra [article] Tuning Solr JVM and Container [article]
Read more
  • 0
  • 1
  • 3472

article-image-hadoop-and-mapreduce
Packt
05 Mar 2015
43 min read
Save for later

Hadoop and MapReduce

Packt
05 Mar 2015
43 min read
In this article by the author, Thilina Gunarathne, of the book, Hadoop MapReduce v2 Cookbook - Second Edition, we will learn about Hadoop and MadReduce. We are living in the era of big data, where exponential growth of phenomena such as web, social networking, smartphones, and so on are producing petabytes of data on a daily basis. Gaining insights from analyzing these very large amounts of data has become a must-have competitive advantage for many industries. However, the size and the possibly unstructured nature of these data sources make it impossible to use traditional solutions such as relational databases to store and analyze these datasets. (For more resources related to this topic, see here.) Storage, processing, and analyzing petabytes of data in a meaningful and timely manner require many compute nodes with thousands of disks and thousands of processors together with the ability to efficiently communicate massive amounts of data among them. Such a scale makes failures such as disk failures, compute node failures, network failures, and so on a common occurrence making fault tolerance a very important aspect of such systems. Other common challenges that arise include the significant cost of resources, handling communication latencies, handling heterogeneous compute resources, synchronization across nodes, and load balancing. As you can infer, developing and maintaining distributed parallel applications to process massive amounts of data while handling all these issues is not an easy task. This is where Apache Hadoop comes to our rescue. Google is one of the first organizations to face the problem of processing massive amounts of data. Google built a framework for large-scale data processing borrowing the map and reduce paradigms from the functional programming world and named it as MapReduce. At the foundation of Google, MapReduce was the Google File System, which is a high throughput parallel filesystem that enables the reliable storage of massive amounts of data using commodity computers. Seminal research publications that introduced Google MapReduce and Google File System concepts can be found at http://research.google.com/archive/mapreduce.html and http://research.google.com/archive/gfs.html.Apache Hadoop MapReduce is the most widely known and widely used open source implementation of the Google MapReduce paradigm. Apache Hadoop Distributed File System (HDFS) provides an open source implementation of the Google File Systems concept. Apache Hadoop MapReduce, HDFS, and YARN provide a scalable, fault-tolerant, distributed platform for storage and processing of very large datasets across clusters of commodity computers. Unlike in traditional High Performance Computing (HPC) clusters, Hadoop uses the same set of compute nodes for data storage as well as to perform the computations, allowing Hadoop to improve the performance of large scale computations by collocating computations with the storage. Also, the hardware cost of a Hadoop cluster is orders of magnitude cheaper than HPC clusters and database appliances due to the usage of commodity hardware and commodity interconnects. Together Hadoop-based frameworks have become the de-facto standard for storing and processing big data. Hadoop Distributed File System – HDFS HDFS is a block structured distributed filesystem that is designed to store petabytes of data reliably on compute clusters made out of commodity hardware. HDFS overlays on top of the existing filesystem of the compute nodes and stores files by breaking them into coarser grained blocks (for example, 128 MB). HDFS performs better with large files. HDFS distributes the data blocks of large files across to all the nodes of the cluster to facilitate the very high parallel aggregate read bandwidth when processing the data. HDFS also stores redundant copies of these data blocks in multiple nodes to ensure reliability and fault tolerance. Data processing frameworks such as MapReduce exploit these distributed sets of data blocks and the redundancy to maximize the data local processing of large datasets, where most of the data blocks would get processed locally in the same physical node as they are stored. HDFS consists of NameNode and DataNode services providing the basis for the distributed filesystem. NameNode stores, manages, and serves the metadata of the filesystem. NameNode does not store any real data blocks. DataNode is a per node service that manages the actual data block storage in the DataNodes. When retrieving data, client applications first contact the NameNode to get the list of locations the requested data resides in and then contact the DataNodes directly to retrieve the actual data. The following diagram depicts a high-level overview of the structure of HDFS: Hadoop v2 brings in several performance, scalability, and reliability improvements to HDFS. One of the most important among those is the High Availability (HA) support for the HDFS NameNode, which provides manual and automatic failover capabilities for the HDFS NameNode service. This solves the widely known NameNode single point of failure weakness of HDFS. Automatic NameNode high availability of Hadoop v2 uses Apache ZooKeeper for failure detection and for active NameNode election. Another important new feature is the support for HDFS federation. HDFS federation enables the usage of multiple independent HDFS namespaces in a single HDFS cluster. These namespaces would be managed by independent NameNodes, but share the DataNodes of the cluster to store the data. The HDFS federation feature improves the horizontal scalability of HDFS by allowing us to distribute the workload of NameNodes. Other important improvements of HDFS in Hadoop v2 include the support for HDFS snapshots, heterogeneous storage hierarchy support (Hadoop 2.3 or higher), in-memory data caching support (Hadoop 2.3 or higher), and many performance improvements. Almost all the Hadoop ecosystem data processing technologies utilize HDFS as the primary data storage. HDFS can be considered as the most important component of the Hadoop ecosystem due to its central nature in the Hadoop architecture. Hadoop YARN YARN (Yet Another Resource Negotiator) is the major new improvement introduced in Hadoop v2. YARN is a resource management system that allows multiple distributed processing frameworks to effectively share the compute resources of a Hadoop cluster and to utilize the data stored in HDFS. YARN is a central component in the Hadoop v2 ecosystem and provides a common platform for many different types of distributed applications. The batch processing based MapReduce framework was the only natively supported data processing framework in Hadoop v1. While MapReduce works well for analyzing large amounts of data, MapReduce by itself is not sufficient enough to support the growing number of other distributed processing use cases such as real-time data computations, graph computations, iterative computations, and real-time data queries. The goal of YARN is to allow users to utilize multiple distributed application frameworks that provide such capabilities side by side sharing a single cluster and the HDFS filesystem. Some examples of the current YARN applications include the MapReduce framework, Tez high performance processing framework, Spark processing engine, and the Storm real-time stream processing framework. The following diagram depicts the high-level architecture of the YARN ecosystem: The YARN ResourceManager process is the central resource scheduler that manages and allocates resources to the different applications (also known as jobs) submitted to the cluster. YARN NodeManager is a per node process that manages the resources of a single compute node. Scheduler component of the ResourceManager allocates resources in response to the resource requests made by the applications, taking into consideration the cluster capacity and the other scheduling policies that can be specified through the YARN policy plugin framework. YARN has a concept called containers, which is the unit of resource allocation. Each allocated container has the rights to a certain amount of CPU and memory in a particular compute node. Applications can request resources from YARN by specifying the required number of containers and the CPU and memory required by each container. ApplicationMaster is a per-application process that coordinates the computations for a single application. The first step of executing a YARN application is to deploy the ApplicationMaster. After an application is submitted by a YARN client, the ResourceManager allocates a container and deploys the ApplicationMaster for that application. Once deployed, the ApplicationMaster is responsible for requesting and negotiating the necessary resource containers from the ResourceManager. Once the resources are allocated by the ResourceManager, ApplicationMaster coordinates with the NodeManagers to launch and monitor the application containers in the allocated resources. The shifting of application coordination responsibilities to the ApplicationMaster reduces the burden on the ResourceManager and allows it to focus solely on managing the cluster resources. Also having separate ApplicationMasters for each submitted application improves the scalability of the cluster as opposed to having a single process bottleneck to coordinate all the application instances. The following diagram depicts the interactions between various YARN components, when a MapReduce application is submitted to the cluster: While YARN supports many different distributed application execution frameworks, our focus in this article is mostly on traditional MapReduce and related technologies. Hadoop MapReduce Hadoop MapReduce is a data processing framework that can be utilized to process massive amounts of data stored in HDFS. As we mentioned earlier, distributed processing of a massive amount of data in a reliable and efficient manner is not an easy task. Hadoop MapReduce aims to make it easy for users by providing a clean abstraction for programmers by providing automatic parallelization of the programs and by providing framework managed fault tolerance support. MapReduce programming model consists of Map and Reduce functions. The Map function receives each record of the input data (lines of a file, rows of a database, and so on) as key-value pairs and outputs key-value pairs as the result. By design, each Map function invocation is independent of each other allowing the framework to use divide and conquer to execute the computation in parallel. This also allows duplicate executions or re-executions of the Map tasks in case of failures or load imbalances without affecting the results of the computation. Typically, Hadoop creates a single Map task instance for each HDFS data block of the input data. The number of Map function invocations inside a Map task instance is equal to the number of data records in the input data block of the particular Map task instance. Hadoop MapReduce groups the output key-value records of all the Map tasks of a computation by the key and distributes them to the Reduce tasks. This distribution and transmission of data to the Reduce tasks is called the Shuffle phase of the MapReduce computation. Input data to each Reduce task would also be sorted and grouped by the key. The Reduce function gets invoked for each key and the group of values of that key (reduce <key, list_of_values>) in the sorted order of the keys. In a typical MapReduce program, users only have to implement the Map and Reduce functions and Hadoop takes care of scheduling and executing them in parallel. Hadoop will rerun any failed tasks and also provide measures to mitigate any unbalanced computations. Have a look at the following diagram for a better understanding of the MapReduce data and computational flows: In Hadoop 1.x, the MapReduce (MR1) components consisted of the JobTracker process, which ran on a master node managing the cluster and coordinating the jobs, and TaskTrackers, which ran on each compute node launching and coordinating the tasks executing in that node. Neither of these processes exist in Hadoop 2.x MapReduce (MR2). In MR2, the job coordinating responsibility of JobTracker is handled by an ApplicationMaster that will get deployed on-demand through YARN. The cluster management and job scheduling responsibilities of JobTracker are handled in MR2 by the YARN ResourceManager. JobHistoryServer has taken over the responsibility of providing information about the completed MR2 jobs. YARN NodeManagers provide the functionality that is somewhat similar to MR1 TaskTrackers by managing resources and launching containers (which in the case of MapReduce 2 houses Map or Reduce tasks) in the compute nodes. Hadoop installation modes Hadoop v2 provides three installation choices: Local mode: The local mode allows us to run MapReduce computation using just the unzipped Hadoop distribution. This nondistributed mode executes all parts of Hadoop MapReduce within a single Java process and uses the local filesystem as the storage. The local mode is very useful for testing/debugging the MapReduce applications locally. Pseudo distributed mode: Using this mode, we can run Hadoop on a single machine emulating a distributed cluster. This mode runs the different services of Hadoop as different Java processes, but within a single machine. This mode is good to let you play and experiment with Hadoop. Distributed mode: This is the real distributed mode that supports clusters that span from a few nodes to thousands of nodes. For production clusters, we recommend using one of the many packaged Hadoop distributions as opposed to installing Hadoop from scratch using the Hadoop release binaries, unless you have a specific use case that requires a vanilla Hadoop installation. Refer to the Setting up Hadoop ecosystem in a distributed cluster environment using a Hadoop distribution recipe for more information on Hadoop distributions. Setting up Hadoop ecosystem in a distributed cluster environment using a Hadoop distribution The Hadoop YARN ecosystem now contains many useful components providing a wide range of data processing, storing, and querying functionalities for the data stored in HDFS. However, manually installing and configuring all of these components to work together correctly using individual release artifacts is quite a challenging task. Other challenges of such an approach include the monitoring and maintenance of the cluster and the multiple Hadoop components. Luckily, there exist several commercial software vendors that provide well integrated packaged Hadoop distributions to make it much easier to provision and maintain a Hadoop YARN ecosystem in our clusters. These distributions often come with easy GUI-based installers that guide you through the whole installation process and allow you to select and install the components that you require in your Hadoop cluster. They also provide tools to easily monitor the cluster and to perform maintenance operations. For regular production clusters, we recommend using a packaged Hadoop distribution from one of the well-known vendors to make your Hadoop journey much easier. Some of these commercial Hadoop distributions (or editions of the distribution) have licenses that allow us to use them free of charge with optional paid support agreements. Hortonworks Data Platform (HDP) is one such well-known Hadoop YARN distribution that is available free of charge. All the components of HDP are available as free and open source software. You can download HDP from http://hortonworks.com/hdp/downloads/. Refer to the installation guides available in the download page for instructions on the installation. Cloudera CDH is another well-known Hadoop YARN distribution. The Express edition of CDH is available free of charge. Some components of the Cloudera distribution are proprietary and available only for paying clients. You can download Cloudera Express from http://www.cloudera.com/content/cloudera/en/products-and-services/cloudera-express.html. Refer to the installation guides available on the download page for instructions on the installation. Hortonworks HDP, Cloudera CDH, and some of the other vendors provide fully configured quick start virtual machine images that you can download and run on your local machine using a virtualization software product. These virtual machines are an excellent resource to learn and try the different Hadoop components as well as for evaluation purposes before deciding on a Hadoop distribution for your cluster. Apache Bigtop is an open source project that aims to provide packaging and integration/interoperability testing for the various Hadoop ecosystem components. Bigtop also provides a vendor neutral packaged Hadoop distribution. While it is not as sophisticated as the commercial distributions, Bigtop is easier to install and maintain than using release binaries of each of the Hadoop components. In this recipe, we provide steps to use Apache Bigtop to install Hadoop ecosystem in your local machine. Benchmarking Hadoop MapReduce using TeraSort Hadoop TeraSort is a well-known benchmark that aims to sort 1 TB of data as fast as possible using Hadoop MapReduce. TeraSort benchmark stresses almost every part of the Hadoop MapReduce framework as well as the HDFS filesystem making it an ideal choice to fine-tune the configuration of a Hadoop cluster. The original TeraSort benchmark sorts 10 million 100 byte records making the total data size 1 TB. However, we can specify the number of records, making it possible to configure the total size of data. Getting ready You must set up and deploy HDFS and Hadoop v2 YARN MapReduce prior to running these benchmarks, and locate the hadoop-mapreduce-examples-*.jar file in your Hadoop installation. How to do it... The following steps will show you how to run the TeraSort benchmark on the Hadoop cluster: The first step of the TeraSort benchmark is the data generation. You can use the teragen command to generate the input data for the TeraSort benchmark. The first parameter of teragen is the number of records and the second parameter is the HDFS directory to generate the data. The following command generates 1 GB of data consisting of 10 million records to the tera-in directory in HDFS. Change the location of the hadoop-mapreduce-examples-*.jar file in the following commands according to your Hadoop installation: $ hadoop jar $HADOOP_HOME/share/Hadoop/mapreduce/hadoop-mapreduce-examples-*.jar teragen 10000000 tera-in It's a good idea to specify the number of Map tasks to the teragen computation to speed up the data generation. This can be done by specifying the –Dmapred.map.tasks parameter. Also, you can increase the HDFS block size for the generated data so that the Map tasks of the TeraSort computation would be coarser grained (the number of Map tasks for a Hadoop computation typically equals the number of input data blocks). This can be done by specifying the –Ddfs.block.size parameter. $ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar teragen –Ddfs.block.size=536870912 –Dmapred.map.tasks=256 10000000 tera-in The second step of the TeraSort benchmark is the execution of the TeraSort MapReduce computation on the data generated in step 1 using the following command. The first parameter of the terasort command is the input of HDFS data directory, and the second part of the terasort command is the output of the HDFS data directory. $ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar terasort tera-in tera-out It's a good idea to specify the number of Reduce tasks to the TeraSort computation to speed up the Reducer part of the computation. This can be done by specifying the –Dmapred.reduce.tasks parameter as follows: $ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar terasort –Dmapred.reduce.tasks=32 tera-in tera-out The last step of the TeraSort benchmark is the validation of the results. This can be done using the teravalidate application as follows. The first parameter is the directory with the sorted data and the second parameter is the directory to store the report containing the results. $ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoopmapreduce- examples-*.jar teravalidate tera-out tera-validate How it works... TeraSort uses the sorting capability of the MapReduce framework together with a custom range Partitioner to divide the Map output among the Reduce tasks ensuring the global sorted order. Optimizing Hadoop YARN and MapReduce configurations for cluster deployments In this recipe, we explore some of the important configuration options of Hadoop YARN and Hadoop MapReduce. Commercial Hadoop distributions typically provide a GUI-based approach to specify Hadoop configurations. YARN allocates resource containers to the applications based on the resource requests made by the applications and the available resource capacity of the cluster. A resource request by an application would consist of the number of containers required and the resource requirement of each container. Currently, most container resource requirements are specified using the amount of memory. Hence, our focus in this recipe will be mainly on configuring the memory allocation of a YARN cluster. Getting ready Set up a Hadoop cluster by following the earlier recipes. How to do it... The following instructions will show you how to configure the memory allocation in a YARN cluster. The number of tasks per node is derived using this configuration: The following property specifies the amount of memory (RAM) that can be used by YARN containers in a worker node. It's advisable to set this slightly less than the amount of physical RAM present in the node, leaving some memory for the OS and other non-Hadoop processes. Add or modify the following lines in the yarn-site.xml file: <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>100240</value> </property> The following property specifies the minimum amount of memory (RAM) that can be allocated to a YARN container in a worker node. Add or modify the following lines in the yarn-site.xml file to configure this property. If we assume that all the YARN resource-requests request containers with only the minimum amount of memory, the maximum number of concurrent resource containers that can be executed in a node equals (YARN memory per node specified in step 1)/(YARN minimum allocation configured below). Based on this relationship, we can use the value of the following property to achieve the desired number of resource containers per node.The number of resource containers per node is recommended to be less than or equal to the minimum of (2*number CPU cores) or (2* number of disks). <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>3072</value> </property> Restart the YARN ResourceManager and NodeManager services by running sbin/stop-yarn.sh and sbin/start-yarn.sh from the HADOOP_HOME directory. The following instructions will show you how to configure the memory requirements of the MapReduce applications. The following properties define the maximum amount of memory (RAM) that will be available to each Map and Reduce task. These memory values will be used when MapReduce applications request resources from YARN for Map and Reduce task containers. Add the following lines to the mapred-site.xml file: <property> <name>mapreduce.map.memory.mb</name> <value>3072</value> </property> <property> <name>mapreduce.reduce.memory.mb</name> <value>6144</value> </property> The following properties define the JVM heap size of the Map and Reduce tasks respectively. Set these values to be slightly less than the corresponding values in step 4, so that they won't exceed the resource limits of the YARN containers. Add the following lines to the mapred-site.xml file: <property> <name>mapreduce.map.java.opts</name> <value>-Xmx2560m</value> </property> <property> <name>mapreduce.reduce.java.opts</name> <value>-Xmx5120m</value> </property> How it works... We can control Hadoop configurations through the following four configuration files. Hadoop reloads the configurations from these configuration files after a cluster restart: core-site.xml: Contains the configurations common to the whole Hadoop distribution hdfs-site.xml: Contains configurations for HDFS mapred-site.xml: Contains configurations for MapReduce yarn-site.xml: Contains configurations for the YARN ResourceManager and NodeManager processes Each configuration file has name-value pairs expressed in XML format, defining the configurations of different aspects of Hadoop. The following is an example of a property in a configuration file. The <configuration> tag is the top-level parent XML container and <property> tags, which define individual properties, are specified as child tags inside the <configuration> tag: <configuration>   <property>     <name>mapreduce.reduce.shuffle.parallelcopies</name>     <value>20</value>   </property>...</configuration> Some configurations can be configured on a per-job basis using the job.getConfiguration().set(name, value) method from the Hadoop MapReduce job driver code. There's more... There are many similar important configuration properties defined in Hadoop. The following are some of them: conf/core-site.xml Name Default value Description fs.inmemory.size.mb 200 Amount of memory allocated to the in-memory filesystem that is used to merge map outputs at reducers in MBs io.file.buffer.size 131072 Size of the read/write buffer used by sequence files conf/mapred-site.xml Name Default value Description mapreduce.reduce.shuffle.parallelcopies 20 Maximum number of parallel copies the reduce step will execute to fetch output from many parallel jobs mapreduce.task.io.sort.factor 50 Maximum number of streams merged while sorting files mapreduce.task.io.sort.mb 200 Memory limit while sorting data in MBs conf/hdfs-site.xml Name Default value Description dfs.blocksize 134217728 HDFS block size dfs.namenode.handler.count 200 Number of server threads to handle RPC calls in NameNodes You can find a list of deprecated properties in the latest version of Hadoop and the new replacement properties for them at http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html.The following documents provide the list of properties, their default values, and the descriptions of each of the configuration files mentioned earlier: Common configuration: http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/core-default.xml HDFS configuration: https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml YARN configuration: http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-common/yarn-default.xml MapReduce configuration: http://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml Unit testing Hadoop MapReduce applications using MRUnit MRUnit is a JUnit-based Java library that allows us to unit test Hadoop MapReduce programs. This makes it easy to develop as well as to maintain Hadoop MapReduce code bases. MRUnit supports testing Mappers and Reducers separately as well as testing MapReduce computations as a whole. In this recipe, we'll be exploring all three testing scenarios. Getting ready We use Gradle as the build tool for our sample code base. How to do it... The following steps show you how to perform unit testing of a Mapper using MRUnit: In the setUp method of the test class, initialize an MRUnit MapDriver instance with the Mapper class you want to test. In this example, we are going to test the Mapper of the WordCount MapReduce application we discussed in earlier recipes: public class WordCountWithToolsTest {   MapDriver<Object, Text, Text, IntWritable> mapDriver;   @Before public void setUp() {    WordCountWithTools.TokenizerMapper mapper =       new WordCountWithTools.TokenizerMapper();    mapDriver = MapDriver.newMapDriver(mapper); } …… } Write a test function to test the Mapper logic. Provide the test input to the Mapper using the MapDriver.withInput method. Then, provide the expected result of the Mapper execution using the MapDriver.withOutput method. Now, invoke the test using the MapDriver.runTest method. The MapDriver.withAll and MapDriver.withAllOutput methods allow us to provide a list of test inputs and a list of expected outputs, rather than adding them individually. @Test public void testWordCountMapper() throws IOException {    IntWritable inKey = new IntWritable(0);    mapDriver.withInput(inKey, new Text("Test Quick"));    ….    mapDriver.withOutput(new Text("Test"),new     IntWritable(1));    mapDriver.withOutput(new Text("Quick"),new     IntWritable(1));    …    mapDriver.runTest(); } The following step shows you how to perform unit testing of a Reducer using MRUnit. Similar to step 1 and 2, initialize a ReduceDriver by providing the Reducer class under test and then configure the ReduceDriver with the test input and the expected output. The input to the reduce function should conform to a key with a list of values. Also, in this test, we use the ReduceDriver.withAllOutput method to provide a list of expected outputs. public class WordCountWithToolsTest { ReduceDriver<Text,IntWritable,Text,IntWritable>   reduceDriver;   @Before public void setUp() {    WordCountWithTools.IntSumReducer reducer =       new WordCountWithTools.IntSumReducer();    reduceDriver = ReduceDriver.newReduceDriver(reducer); }   @Test public void testWordCountReduce() throws IOException {    ArrayList<IntWritable> reduceInList =       new ArrayList<IntWritable>();    reduceInList.add(new IntWritable(1));    reduceInList.add(new IntWritable(2));      reduceDriver.withInput(new Text("Quick"),     reduceInList);    ...    ArrayList<Pair<Text, IntWritable>> reduceOutList =       new ArrayList<Pair<Text,IntWritable>>();    reduceOutList.add(new Pair<Text, IntWritable>     (new Text("Quick"),new IntWritable(3)));    ...    reduceDriver.withAllOutput(reduceOutList);    reduceDriver.runTest(); } } The following steps show you how to perform unit testing on a whole MapReduce computation using MRUnit. In this step, initialize a MapReduceDriver by providing the Mapper class and Reducer class of the MapReduce program that you want to test. Then, configure the MapReduceDriver with the test input data and the expected output data. When executed, this test will execute the MapReduce execution flow starting from the Map input stage to the Reduce output stage. It's possible to provide a combiner implementation to this test as well. public class WordCountWithToolsTest { …… MapReduceDriver<Object, Text, Text, IntWritable, Text,IntWritable> mapReduceDriver; @Before public void setUp() { .... mapReduceDriver = MapReduceDriver. newMapReduceDriver(mapper, reducer); } @Test public void testWordCountMapReduce() throws IOException { IntWritable inKey = new IntWritable(0); mapReduceDriver.withInput(inKey, new Text ("Test Quick")); …… ArrayList<Pair<Text, IntWritable>> reduceOutList = new ArrayList<Pair<Text,IntWritable>>(); reduceOutList.add(new Pair<Text, IntWritable> (new Text("Quick"),new IntWritable(2))); …… mapReduceDriver.withAllOutput(reduceOutList); mapReduceDriver.runTest(); } } The Gradle build script (or any other Java build mechanism) can be configured to execute these unit tests with every build. We can add the MRUnit dependency to the Gradle build file as follows: dependencies { testCompile group: 'org.apache.mrunit', name: 'mrunit',   version: '1.1.+',classifier: 'hadoop2' …… } Use the following Gradle command to execute only the WordCountWithToolsTest unit test. This command executes any test class that matches the pattern **/WordCountWith*.class: $ gradle –Dtest.single=WordCountWith test :chapter3:compileJava UP-TO-DATE :chapter3:processResources UP-TO-DATE :chapter3:classes UP-TO-DATE :chapter3:compileTestJava UP-TO-DATE :chapter3:processTestResources UP-TO-DATE :chapter3:testClasses UP-TO-DATE :chapter3:test BUILD SUCCESSFUL Total time: 27.193 secs You can also execute MRUnit-based unit tests in your IDE. You can use the gradle eclipse or gradle idea commands to generate the project files for the Eclipse and IDEA IDE respectively. Generating an inverted index using Hadoop MapReduce Simple text searching systems rely on inverted index to look up the set of documents that contain a given word or a term. In this recipe, we implement a simple inverted index building application that computes a list of terms in the documents, the set of documents that contains each term, and the term frequency in each of the documents. Retrieval of results from an inverted index can be as simple as returning the set of documents that contains the given terms or can involve much more complex operations such as returning the set of documents ordered based on a particular ranking. Getting ready You must have Apache Hadoop v2 configured and installed to follow this recipe. Gradle is needed for the compiling and building of the source code. How to do it... In the following steps, we use a MapReduce program to build an inverted index for a text dataset: Create a directory in HDFS and upload a text dataset. This dataset should consist of one or more text files. $ hdfs dfs -mkdir input_dir $ hdfs dfs -put *.txt input_dir You can download the text versions of the Project Gutenberg books by following the instructions given at http://www.gutenberg.org/wiki/Gutenberg:Information_About_Robot_Access_to_our_Pages. Make sure to provide the filetypes query parameter of the download request as txt. Unzip the downloaded files. You can use the unzipped text files as the text dataset for this recipe. Compile the source by running the gradle build command from the chapter 8 folder of the source repository. Run the inverted indexing MapReduce job using the following command.Provide the HDFS directory where you uploaded the input data in step 2 as the first argument and provide an HDFS path to store the output as the second argument: $ hadoop jar hcb-c8-samples.jar chapter8.invertindex.TextOutInvertedIndexMapReduce input_dir output_dir Check the output directory for the results by running the following command. The output of this program will consist of the term followed by a comma-separated list of filename and frequency: $ hdfs dfs -cat output_dir/* ARE three.txt:1,one.txt:1,four.txt:1,two.txt:1, AS three.txt:2,one.txt:2,four.txt:2,two.txt:2, AUGUSTA three.txt:1, About three.txt:1,two.txt:1, Abroad three.txt:2, We used the text outputting inverted indexing MapReduce program in step 3 for the clarity of understanding the algorithm. Run the program by substituting the command in step 3 with the following command: $ hadoop jar hcb-c8-samples.jar chapter8.invertindex.InvertedIndexMapReduce input_dir seq_output_dir How it works... The Map Function receives a chunk of an input document as the input and outputs the term and <docid, 1> pair for each word. In the Map function, we first replace all the non-alphanumeric characters from the input text value before tokenizing it as follows: public void map(Object key, Text value, ……… { String valString = value.toString().replaceAll("[^a-zA-Z0-9]+"," "); StringTokenizer itr = new StringTokenizer(valString); StringTokenizer(value.toString()); FileSplit fileSplit = (FileSplit) context.getInputSplit(); String fileName = fileSplit.getPath().getName(); while (itr.hasMoreTokens()) { term.set(itr.nextToken()); docFrequency.set(fileName, 1); context.write(term, docFrequency); } } We use the getInputSplit() method of MapContext to obtain a reference to InputSplit assigned to the current Map task. The InputSplits class for this computation are instances of FileSplit due to the usage of a FileInputFormat based InputFormat. Then we use the getPath() method of FileSplit to obtain the path of the file containing the current split and extract the filename from it. We use this extracted filename as the document ID when constructing the inverted index. The Reduce function receives IDs and frequencies of all the documents that contain the term (Key) as the input. The Reduce function then outputs the term and a list of document IDs and the number of occurrences of the term in each document as the output: public void reduce(Text key, Iterable values,Context context) …………{ HashMap<Text, IntWritable> map = new HashMap<Text, IntWritable>(); for (TermFrequencyWritable val : values) { Text docID = new Text(val.getDocumentID()); int freq = val.getFreq().get(); if (map.get(docID) != null) { map.put(docID, new IntWritable(map.get(docID).get() + freq)); } else { map.put(docID, new IntWritable(freq)); } } MapWritable outputMap = new MapWritable(); outputMap.putAll(map); context.write(key, outputMap); } In the preceding model, we output a record for each word, generating a large amount of intermediate data between Map tasks and Reduce tasks. We use the following combiner to aggregate the terms emitted by the Map tasks, reducing the amount of Intermediate data that needs to be transferred between Map and Reduce tasks: public void reduce(Text key, Iterable values …… { int count = 0; String id = ""; for (TermFrequencyWritable val : values) { count++; if (count == 1) { id = val.getDocumentID().toString(); } } TermFrequencyWritable writable = new TermFrequencyWritable(); writable.set(id, count); context.write(key, writable); } In the driver program, we set the Mapper, Reducer, and the Combiner classes. Also, we specify both Output Value and the MapOutput Value properties as we use different value types for the Map tasks and the reduce tasks. … job.setMapperClass(IndexingMapper.class); job.setReducerClass(IndexingReducer.class); job.setCombinerClass(IndexingCombiner.class); … job.setMapOutputValueClass(TermFrequencyWritable.class); job.setOutputValueClass(MapWritable.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); There's more... We can improve this indexing program by performing optimizations such as filtering stop words, substituting words with word stems, storing more information about the context of the word, and so on, making indexing a much more complex problem. Luckily, there exist several open source indexing frameworks that we can use for indexing purposes. The later recipes of this article will explore indexing using Apache Solr and Elasticsearch, which are based on the Apache Lucene indexing engine. The upcoming section introduces the usage of MapFileOutputFormat to store InvertedIndex in an indexed random accessible manner. Outputting a random accessible indexed InvertedIndex Apache Hadoop supports a file format called MapFile that can be used to store an index into the data stored in SequenceFiles. MapFile is very useful when we need to random access records stored in a large SequenceFile. You can use the MapFileOutputFormat format to output MapFiles, which would consist of a SequenceFile containing the actual data and another file containing the index into the SequenceFile. The chapter8/invertindex/MapFileOutInvertedIndexMR.java MapReduce program in the source folder of chapter8 utilizes MapFiles to store a secondary index into our inverted index. You can execute that program by using the following command. The third parameter (sample_lookup_term) should be a word that is present in your input dataset: $ hadoop jar hcb-c8-samples.jar      chapter8.invertindex.MapFileOutInvertedIndexMR      input_dir indexed_output_dir sample_lookup_term If you check indexed_output_dir, you will be able to see folders named as part-r-xxxxx with each containing a data and an index file. We can load these indexes to MapFileOutputFormat and perform random lookups for the data. An example of a simple lookup using this method is given in the MapFileOutInvertedIndexMR.java program as follows: MapFile.Reader[] indexReaders = MapFileOutputFormat.getReaders(new Path(args[1]), getConf());MapWritable value = new MapWritable();Text lookupKey = new Text(args[2]);// Performing the lookup for the values if the lookupKeyWritable map = MapFileOutputFormat.getEntry(indexReaders,new HashPartitioner<Text, MapWritable>(), lookupKey, value); In order to use this feature, you need to make sure to disable Hadoop from writing a _SUCCESS file in the output folder by setting the following property. The presence of the _SUCCESS file might cause an error when using MapFileOutputFormat to lookup the values in the index: job.getConfiguration().setBoolean     ("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); Data preprocessing using Hadoop streaming and Python Data preprocessing is an important and often required component in data analytics. Data preprocessing becomes even more important when consuming unstructured text data generated from multiple different sources. Data preprocessing steps include operations such as cleaning the data, extracting important features from data, removing duplicate items from the datasets, converting data formats, and many more. Hadoop MapReduce provides an ideal environment to perform these tasks in parallel when processing massive datasets. Apart from using Java MapReduce programs or Pig scripts or Hive scripts to preprocess the data, Hadoop also contains several other tools and features that are useful in performing these data preprocessing operations. One such feature is the InputFormats, which provides us with the ability to support custom data formats by implementing custom InputFormats. Another feature is the Hadoop Streaming support, which allows us to use our favorite scripting languages to perform the actual data cleansing and extraction, while Hadoop will parallelize the computation to hundreds of compute and storage resources. In this recipe, we are going to use Hadoop Streaming with a Python script-based Mapper to perform data extraction and format conversion. Getting ready Check whether Python is already installed on the Hadoop worker nodes. If not, install Python on all the Hadoop worker nodes. How to do it... The following steps show how to clean and extract data from the 20news dataset and store the data as a tab-separated file: Download and extract the 20news dataset from http://qwone.com/~jason/20Newsgroups/20news-19997.tar.gz: $ wget http://qwone.com/~jason/20Newsgroups/20news-19997.tar.gz$ tar –xzf 20news-19997.tar.gz Upload the extracted data to the HDFS. In order to save the compute time and resources, you can use only a subset of the dataset: $ hdfs dfs -mkdir 20news-all$ hdfs dfs –put <extracted_folder> 20news-all Extract the resource package and locate the MailPreProcessor.py Python script. Locate the hadoop-streaming.jar JAR file of the Hadoop installation in your machine. Run the following Hadoop Streaming command using that JAR. /usr/lib/hadoop-mapreduce/ is the hadoop-streaming JAR file's location for the BigTop-based Hadoop installations: $ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -input 20news-all/*/* -output 20news-cleaned -mapper MailPreProcessor.py -file MailPreProcessor.py Inspect the results using the following command: > hdfs dfs –cat 20news-cleaned/part-* | more How it works... Hadoop uses the default TextInputFormat as the input specification for the previous computation. Usage of the TextInputFormat generates a Map task for each file in the input dataset and generates a Map input record for each line. Hadoop streaming provides the input to the Map application through the standard input: line = sys.stdin.readline(); while line: …. if (doneHeaders):    list.append( line ) elif line.find( "Message-ID:" ) != -1:    messageID = line[ len("Message-ID:"):] …. elif line == "":    doneHeaders = True      line = sys.stdin.readline(); The preceding Python code reads the input lines from the standard input until it reaches the end of the file. We parse the headers of the newsgroup file till we encounter the empty line that demarcates the headers from the message contents. The message content will be read in to a list line by line: value = ' '.join( list ) value = fromAddress + "t" ……"t" + value print '%st%s' % (messageID, value) The preceding code segment merges the message content to a single string and constructs the output value of the streaming application as a tab-delimited set of selected headers, followed by the message content. The output key value is the Message-ID header extracted from the input file. The output is written to the standard output by using a tab to delimit the key and the value. There's more... We can generate the output of the preceding computation in the Hadoop SequenceFile format by specifying SequenceFileOutputFormat as the OutputFormat of the streaming computations: $ hadoop jar /usr/lib/Hadoop-mapreduce/hadoop-streaming.jar -input 20news-all/*/* -output 20news-cleaned -mapper MailPreProcessor.py -file MailPreProcessor.py -outputformat          org.apache.hadoop.mapred.SequenceFileOutputFormat -file MailPreProcessor.py It is a good practice to store the data as SequenceFiles (or other Hadoop binary file formats such as Avro) after the first pass of the input data because SequenceFiles takes up less space and supports compression. You can use hdfs dfs -text <path_to_sequencefile> to output the contents of a SequenceFile to text: $ hdfs dfs –text 20news-seq/part-* | more However, for the preceding command to work, any Writable classes that are used in the SequenceFile should be available in the Hadoop classpath. Loading large datasets to an Apache HBase data store – importtsv and bulkload The Apache HBase data store is very useful when storing large-scale data in a semi-structured manner, so that it can be used for further processing using Hadoop MapReduce programs or to provide a random access data storage for client applications. In this recipe, we are going to import a large text dataset to HBase using the importtsv and bulkload tools. Getting ready Install and deploy Apache HBase in your Hadoop cluster. Make sure Python is installed in your Hadoop compute nodes. How to do it… The following steps show you how to load the TSV (tab-separated value) converted 20news dataset in to an HBase table: Follow the Data preprocessing using Hadoop streaming and Python recipe to perform the preprocessing of data for this recipe. We assume that the output of the following step 4 of that recipe is stored in an HDFS folder named "20news-cleaned": $ hadoop jar    /usr/lib/hadoop-mapreduce/hadoop-streaming.jar    -input 20news-all/*/*    -output 20news-cleaned    -mapper MailPreProcessor.py -file MailPreProcessor.py Start the HBase shell: $ hbase shell Create a table named 20news-data by executing the following command in the HBase shell. Older versions of the importtsv (used in the next step) command can handle only a single column family. Hence, we are using only a single column family when creating the HBase table: hbase(main):001:0> create '20news-data','h' Execute the following command to import the preprocessed data to the HBase table created earlier: $ hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,h:from,h:group,h:subj,h:msg 20news-data 20news-cleaned Start the HBase Shell and use the count and scan commands of the HBase shell to verify the contents of the table: hbase(main):010:0> count '20news-data'           12xxx row(s) in 0.0250 seconds hbase(main):010:0> scan '20news-data', {LIMIT => 10} ROW                                       COLUMN+CELL                                                                           <[email protected] column=h:c1,       timestamp=1354028803355, value= [email protected]   (Chris Katopis)> <[email protected] column=h:c2,     timestamp=1354028803355, value= sci.electronics   ...... The following are the steps to load the 20news dataset to an HBase table using the bulkload feature: Follow steps 1 to 3, but create the table with a different name: hbase(main):001:0> create '20news-bulk','h' Use the following command to generate an HBase bulkload datafile: $ hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,h:from,h:group,h:subj,h:msg -Dimporttsv.bulk.output=hbaseloaddir 20news-bulk–source 20news-cleaned List the files to verify that the bulkload datafiles are generated: $ hadoop fs -ls 20news-bulk-source ...... drwxr-xr-x   - thilina supergroup         0 2014-04-27 10:06 /user/thilina/20news-bulk-source/h   $ hadoop fs -ls 20news-bulk-source/h -rw-r--r--   1 thilina supergroup     19110 2014-04-27 10:06 /user/thilina/20news-bulk-source/h/4796511868534757870 The following command loads the data to the HBase table by moving the output files to the correct location: $ hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles 20news-bulk-source 20news-bulk......14/04/27 10:10:00 INFO mapreduce.LoadIncrementalHFiles: Tryingto load hfile=hdfs://127.0.0.1:9000/user/thilina/20news-bulksource/h/4796511868534757870 first= <[email protected]>last= <stephens.736002130@ngis>...... Start the HBase Shell and use the count and scan commands of the HBase shell to verify the contents of the table: hbase(main):010:0> count '20news-bulk'             hbase(main):010:0> scan '20news-bulk', {LIMIT => 10} How it works... The MailPreProcessor.py Python script extracts a selected set of data fields from the newsboard message and outputs them as a tab-separated dataset: value = fromAddress + "t" + newsgroup +"t" + subject +"t" + value print '%st%s' % (messageID, value) We import the tab-separated dataset generated by the Streaming MapReduce computations to HBase using the importtsv tool. The importtsv tool requires the data to have no other tab characters except for the tab characters that separate the data fields. Hence, we remove any tab characters that may be present in the input data by using the following snippet of the Python script: line = line.strip() line = re.sub('t',' ',line) The importtsv tool supports the loading of data into HBase directly using the Put operations as well as by generating the HBase internal HFiles as well. The following command loads the data to HBase directly using the Put operations. Our generated dataset contains a Key and four fields in the values. We specify the data fields to the table column name mapping for the dataset using the -Dimporttsv.columns parameter. This mapping consists of listing the respective table column names in the order of the tab-separated data fields in the input dataset: $ hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=<data field to table column mappings>    <HBase tablename> <HDFS input directory> We can use the following command to generate HBase HFiles for the dataset. These HFiles can be directly loaded to HBase without going through the HBase APIs, thereby reducing the amount of CPU and network resources needed: $ hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=<filed to column mappings> -Dimporttsv.bulk.output=<path for hfile output> <HBase tablename> <HDFS input directory> These generated HFiles can be loaded into HBase tables by simply moving the files to the right location. This moving can be performed by using the completebulkload command: $ hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <HDFS path for hfiles> <table name> There's more... You can use the importtsv tool that has datasets with other data-filed separator characters as well by specifying the '-Dimporttsv.separator' parameter. The following is an example of using a comma as the separator character to import a comma-separated dataset in to an HBase table: $ hbase org.apache.hadoop.hbase.mapreduce.ImportTsv '-Dimporttsv.separator=,' -Dimporttsv.columns=<data field to table column mappings>    <HBase tablename> <HDFS input directory> Look out for Bad Lines in the MapReduce job console output or in the Hadoop monitoring console. One reason for Bad Lines is to have unwanted delimiter characters. The Python script we used in the data-cleaning step removes any extra tabs in the message: 14/03/27 00:38:10 INFO mapred.JobClient:   ImportTsv 14/03/27 00:38:10 INFO mapred.JobClient:     Bad Lines=2 Data de-duplication using HBase HBase supports the storing of multiple versions of column values for each record. When querying, HBase returns the latest version of values, unless we specifically mention a time period. This feature of HBase can be used to perform automatic de-duplication by making sure we use the same RowKey for duplicate values. In our 20news example, we use MessageID as the RowKey for the records, ensuring duplicate messages will appear as different versions of the same data record. HBase allows us to configure the maximum or minimum number of versions per column family. Setting the maximum number of versions to a low value will reduce the data usage by discarding the old versions. Refer to http://hbase.apache.org/book/schema.versions.html for more information on setting the maximum or minimum number of versions. Summary In this article, we have learned about getting started with Hadoop, Benchmarking Hadoop MapReduce, optimizing Hadoop YARN, unit testing, generating an inverted index, data processing, and loading large datasets to an Apache HBase data store. Resources for Article: Further resources on this subject: Hive in Hadoop [article] Evolution of Hadoop [article] Learning Data Analytics with R and Hadoop [article]
Read more
  • 0
  • 0
  • 5657

article-image-writing-consumers
Packt
04 Mar 2015
20 min read
Save for later

Writing Consumers

Packt
04 Mar 2015
20 min read
This article by Nishant Garg, the author of the book Learning Apache Kafka Second Edition, focuses on the details of Writing Consumers. Consumers are the applications that consume the messages published by Kafka producers and process the data extracted from them. Like producers, consumers can also be different in nature, such as applications doing real-time or near real-time analysis, applications with NoSQL or data warehousing solutions, backend services, consumers for Hadoop, or other subscriber-based solutions. These consumers can also be implemented in different languages such as Java, C, and Python. (For more resources related to this topic, see here.) In this article, we will focus on the following topics: The Kafka Consumer API Java-based Kafka consumers Java-based Kafka consumers consuming partitioned messages At the end of the article, we will explore some of the important properties that can be set for a Kafka consumer. So, let's start. The preceding diagram explains the high-level working of the Kafka consumer when consuming the messages. The consumer subscribes to the message consumption from a specific topic on the Kafka broker. The consumer then issues a fetch request to the lead broker to consume the message partition by specifying the message offset (the beginning position of the message offset). Therefore, the Kafka consumer works in the pull model and always pulls all available messages after its current position in the Kafka log (the Kafka internal data representation). While subscribing, the consumer connects to any of the live nodes and requests metadata about the leaders for the partitions of a topic. This allows the consumer to communicate directly with the lead broker receiving the messages. Kafka topics are divided into a set of ordered partitions and each partition is consumed by one consumer only. Once a partition is consumed, the consumer changes the message offset to the next partition to be consumed. This represents the states about what has been consumed and also provides the flexibility of deliberately rewinding back to an old offset and re-consuming the partition. In the next few sections, we will discuss the API provided by Kafka for writing Java-based custom consumers. All the Kafka classes referred to in this article are actually written in Scala. Kafka consumer APIs Kafka provides two types of API for Java consumers: High-level API Low-level API The high-level consumer API The high-level consumer API is used when only data is needed and the handling of message offsets is not required. This API hides broker details from the consumer and allows effortless communication with the Kafka cluster by providing an abstraction over the low-level implementation. The high-level consumer stores the last offset (the position within the message partition where the consumer left off consuming the message), read from a specific partition in Zookeeper. This offset is stored based on the consumer group name provided to Kafka at the beginning of the process. The consumer group name is unique and global across the Kafka cluster and any new consumers with an in-use consumer group name may cause ambiguous behavior in the system. When a new process is started with the existing consumer group name, Kafka triggers a rebalance between the new and existing process threads for the consumer group. After the rebalance, some messages that are intended for a new process may go to an old process, causing unexpected results. To avoid this ambiguous behavior, any existing consumers should be shut down before starting new consumers for an existing consumer group name. The following are the classes that are imported to write Java-based basic consumers using the high-level consumer API for a Kafka cluster: ConsumerConnector: Kafka provides the ConsumerConnector interface (interface ConsumerConnector) that is further implemented by the ZookeeperConsumerConnector class (kafka.javaapi.consumer.ZookeeperConsumerConnector). This class is responsible for all the interaction a consumer has with ZooKeeper. The following is the class diagram for the ConsumerConnector class: KafkaStream: Objects of the kafka.consumer.KafkaStream class are returned by the createMessageStreams call from the ConsumerConnector implementation. This list of the KafkaStream objects is returned for each topic, which can further create an iterator over messages in the stream. The following is the Scala-based class declaration: class KafkaStream[K,V](private val queue:                       BlockingQueue[FetchedDataChunk],                       consumerTimeoutMs: Int,                       private val keyDecoder: Decoder[K],                       private val valueDecoder: Decoder[V],                       val clientId: String) Here, the parameters K and V specify the type for the partition key and message value, respectively. In the create call from the ConsumerConnector class, clients can specify the number of desired streams, where each stream object is used for single-threaded processing. These stream objects may represent the merging of multiple unique partitions. ConsumerConfig: The kafka.consumer.ConsumerConfig class encapsulates the property values required for establishing the connection with ZooKeeper, such as ZooKeeper URL, ZooKeeper session timeout, and ZooKeeper sink time. It also contains the property values required by the consumer such as group ID and so on. A high-level API-based working consumer example is discussed after the next section. The low-level consumer API The high-level API does not allow consumers to control interactions with brokers. Also known as "simple consumer API", the low-level consumer API is stateless and provides fine grained control over the communication between Kafka broker and the consumer. It allows consumers to set the message offset with every request raised to the broker and maintains the metadata at the consumer's end. This API can be used by both online as well as offline consumers such as Hadoop. These types of consumers can also perform multiple reads for the same message or manage transactions to ensure the message is consumed only once. Compared to the high-level consumer API, developers need to put in extra effort to gain low-level control within consumers by keeping track of offsets, figuring out the lead broker for the topic and partition, handling lead broker changes, and so on. In the low-level consumer API, consumers first query the live broker to find out the details about the lead broker. Information about the live broker can be passed on to the consumers either using a properties file or from the command line. The topicsMetadata() method of the kafka.javaapi.TopicMetadataResponse class is used to find out metadata about the topic of interest from the lead broker. For message partition reading, the kafka.api.OffsetRequest class defines two constants: EarliestTime and LatestTime, to find the beginning of the data in the logs and the new messages stream. These constants also help consumers to track which messages are already read. The main class used within the low-level consumer API is the SimpleConsumer (kafka.javaapi.consumer.SimpleConsumer) class. The following is the class diagram for the SimpleConsumer class:   A simple consumer class provides a connection to the lead broker for fetching the messages from the topic and methods to get the topic metadata and the list of offsets. A few more important classes for building different request objects are FetchRequest (kafka.api.FetchRequest), OffsetRequest (kafka.javaapi.OffsetRequest), OffsetFetchRequest (kafka.javaapi.OffsetFetchRequest), OffsetCommitRequest (kafka.javaapi.OffsetCommitRequest), and TopicMetadataRequest (kafka.javaapi.TopicMetadataRequest). All the examples in this article are based on the high-level consumer API. For examples based on the low-level consumer API, refer tohttps://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example. Simple Java consumers Now we will start writing a single-threaded simple Java consumer developed using the high-level consumer API for consuming the messages from a topic. This SimpleHLConsumer class is used to fetch a message from a specific topic and consume it, assuming that there is a single partition within the topic. Importing classes As a first step, we need to import the following classes: import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; Defining properties As a next step, we need to define properties for making a connection with Zookeeper and pass these properties to the Kafka consumer using the following code: Properties props = new Properties(); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "testgroup"); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); new ConsumerConfig(props); Now let us see the major properties mentioned in the code: zookeeper.connect: This property specifies the ZooKeeper <node:port> connection detail that is used to find the Zookeeper running instance in the cluster. In the Kafka cluster, Zookeeper is used to store offsets of messages consumed for a specific topic and partition by this consumer group. group.id: This property specifies the name for the consumer group shared by all the consumers within the group. This is also the process name used by Zookeeper to store offsets. zookeeper.session.timeout.ms: This property specifies the Zookeeper session timeout in milliseconds and represents the amount of time Kafka will wait for Zookeeper to respond to a request before giving up and continuing to consume messages. zookeeper.sync.time.ms: This property specifies the ZooKeeper sync time in milliseconds between the ZooKeeper leader and the followers. auto.commit.interval.ms: This property defines the frequency in milliseconds at which consumer offsets get committed to Zookeeper. Reading messages from a topic and printing them As a final step, we need to read the message using the following code: Map<String, Integer> topicMap = new HashMap<String, Integer>(); // 1 represents the single thread topicCount.put(topic, new Integer(1));   Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap);   // Get the list of message streams for each topic, using the default decoder. List<KafkaStream<byte[], byte[]>>streamList =  consumerStreamsMap.get(topic);   for (final KafkaStream <byte[], byte[]> stream : streamList) { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();   while (consumerIte.hasNext())     System.out.println("Message from Single Topic :: "     + new String(consumerIte.next().message())); } So the complete program will look like the following code: package kafka.examples.ch5;   import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;   import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;   public class SimpleHLConsumer {   private final ConsumerConnector consumer;   private final String topic;     public SimpleHLConsumer(String zookeeper, String groupId, String topic) {     consumer = kafka.consumer.Consumer         .createJavaConsumerConnector(createConsumerConfig(zookeeper,             groupId));     this.topic = topic;   }     private static ConsumerConfig createConsumerConfig(String zookeeper,         String groupId) {     Properties props = new Properties();     props.put("zookeeper.connect", zookeeper);     props.put("group.id", groupId);     props.put("zookeeper.session.timeout.ms", "500");     props.put("zookeeper.sync.time.ms", "250");     props.put("auto.commit.interval.ms", "1000");       return new ConsumerConfig(props);     }     public void testConsumer() {       Map<String, Integer> topicMap = new HashMap<String, Integer>();       // Define single thread for topic     topicMap.put(topic, new Integer(1));       Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap =         consumer.createMessageStreams(topicMap);       List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap         .get(topic);       for (final KafkaStream<byte[], byte[]> stream : streamList) {       ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();       while (consumerIte.hasNext())         System.out.println("Message from Single Topic :: "           + new String(consumerIte.next().message()));     }     if (consumer != null)       consumer.shutdown();   }     public static void main(String[] args) {       String zooKeeper = args[0];     String groupId = args[1];     String topic = args[2];     SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer(           zooKeeper, groupId, topic);     simpleHLConsumer.testConsumer();   }   } Before running this, make sure you have created the topic kafkatopic from the command line: [root@localhost kafka_2.9.2-0.8.1.1]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kafkatopic Before compiling and running a Java-based Kafka program in the console, make sure you download the slf4j-1.7.7.tar.gz file from http://www.slf4j.org/download.html and copy slf4j-log4j12-1.7.7.jar contained within slf4j-1.7.7.tar.gz to the /opt/kafka_2.9.2-0.8.1.1/libs directory. Also add all the libraries available in /opt/kafka_2.9.2-0.8.1.1/libs to the classpath using the following commands: [root@localhost kafka_2.9.2-0.8.1.1]# export KAFKA_LIB=/opt/kafka_2.9.2-0.8.1.1/libs [root@localhost kafka_2.9.2-0.8.1.1]# export CLASSPATH=.:$KAFKA_LIB/jopt-simple-3.2.jar:$KAFKA_LIB/kafka_2.9.2-0.8.1.1.jar:$KAFKA_LIB/log4j-1.2.15.jar:$KAFKA_LIB/metrics-core-2.2.0.jar:$KAFKA_LIB/scala-library-2.9.2.jar:$KAFKA_LIB/slf4j-api-1.7.2.jar:$KAFKA_LIB/slf4j-log4j12-1.7.7.jar:$KAFKA_LIB/snappy-java-1.0.5.jar:$KAFKA_LIB/zkclient-0.3.jar:$KAFKA_LIB/zookeeper-3.3.4.jar Multithreaded Java consumers The previous example is a very basic example of a consumer that consumes messages from a single broker with no explicit partitioning of messages within the topic. Let's jump to the next level and write another program that consumes messages from multiple partitions connecting to single/multiple topics. A multithreaded, high-level, consumer-API-based design is usually based on the number of partitions in the topic and follows a one-to-one mapping approach between the thread and the partitions within the topic. For example, if four partitions are defined for any topic, as a best practice, only four threads should be initiated with the consumer application to read the data; otherwise, some conflicting behavior, such as threads never receiving a message or a thread receiving messages from multiple partitions, may occur. Also, receiving multiple messages will not guarantee that the messages will be placed in order. For example, a thread may receive two messages from the first partition and three from the second partition, then three more from the first partition, followed by some more from the first partition, even if the second partition has data available. Let's move further on. Importing classes As a first step, we need to import the following classes: import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; Defining properties As the next step, we need to define properties for making a connection with Zookeeper and pass these properties to the Kafka consumer using the following code: Properties props = new Properties(); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "testgroup"); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); new ConsumerConfig(props); The preceding properties have already been discussed in the previous example. For more details on Kafka consumer properties, refer to the last section of this article. Reading the message from threads and printing it The only difference in this section from the previous section is that we first create a thread pool and get the Kafka streams associated with each thread within the thread pool, as shown in the following code: // Define thread count for each topic topicMap.put(topic, new Integer(threadCount));   // Here we have used a single topic but we can also add // multiple topics to topicCount MAP Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap            = consumer.createMessageStreams(topicMap);   List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic);   // Launching the thread pool executor = Executors.newFixedThreadPool(threadCount); The complete program listing for the multithread Kafka consumer based on the Kafka high-level consumer API is as follows: package kafka.examples.ch5;   import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;   import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;   public class MultiThreadHLConsumer {     private ExecutorService executor;   private final ConsumerConnector consumer;   private final String topic;     public MultiThreadHLConsumer(String zookeeper, String groupId, String topic) {     consumer = kafka.consumer.Consumer         .createJavaConsumerConnector(createConsumerConfig(zookeeper, groupId));     this.topic = topic;   }     private static ConsumerConfig createConsumerConfig(String zookeeper,         String groupId) {     Properties props = new Properties();     props.put("zookeeper.connect", zookeeper);     props.put("group.id", groupId);     props.put("zookeeper.session.timeout.ms", "500");     props.put("zookeeper.sync.time.ms", "250");     props.put("auto.commit.interval.ms", "1000");       return new ConsumerConfig(props);     }     public void shutdown() {     if (consumer != null)       consumer.shutdown();     if (executor != null)       executor.shutdown();   }     public void testMultiThreadConsumer(int threadCount) {       Map<String, Integer> topicMap = new HashMap<String, Integer>();       // Define thread count for each topic     topicMap.put(topic, new Integer(threadCount));       // Here we have used a single topic but we can also add     // multiple topics to topicCount MAP     Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap =         consumer.createMessageStreams(topicMap);       List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap         .get(topic);       // Launching the thread pool     executor = Executors.newFixedThreadPool(threadCount);       // Creating an object messages consumption     int count = 0;     for (final KafkaStream<byte[], byte[]> stream : streamList) {       final int threadNumber = count;       executor.submit(new Runnable() {       public void run() {       ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();       while (consumerIte.hasNext())         System.out.println("Thread Number " + threadNumber + ": "         + new String(consumerIte.next().message()));         System.out.println("Shutting down Thread Number: " +         threadNumber);         }       });       count++;     }     if (consumer != null)       consumer.shutdown();     if (executor != null)       executor.shutdown();   }     public static void main(String[] args) {       String zooKeeper = args[0];     String groupId = args[1];     String topic = args[2];     int threadCount = Integer.parseInt(args[3]);     MultiThreadHLConsumer multiThreadHLConsumer =         new MultiThreadHLConsumer(zooKeeper, groupId, topic);     multiThreadHLConsumer.testMultiThreadConsumer(threadCount);     try {       Thread.sleep(10000);     } catch (InterruptedException ie) {       }     multiThreadHLConsumer.shutdown();     } } Compile the preceding program, and before running it, read the following tip. Before we run this program, we need to make sure our cluster is running as a multi-broker cluster (comprising either single or multiple nodes).  Once your multi-broker cluster is up, create a topic with four partitions and set the replication factor to 2 before running this program using the following command: [root@localhost kafka-0.8]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic kafkatopic --partitions 4 --replication-factor 2 The Kafka consumer property list The following lists of a few important properties that can be configured for high-level, consumer-API-based Kafka consumers. The Scala class kafka.consumer.ConsumerConfig provides implementation-level details for consumer configurations. For a complete list, visit http://kafka.apache.org/documentation.html#consumerconfigs. Property name Description Default value group.id This property defines a unique identity for the set of consumers within the same consumer group.   consumer.id This property is specified for the Kafka consumer and generated automatically if not defined. null zookeeper.connect This property specifies the Zookeeper connection string, < hostname:port/chroot/path>. Kafka uses Zookeeper to store offsets of messages consumed for a specific topic and partition by the consumer group. /chroot/path defines the data location in a global zookeeper namespace.   client.id The client.id value is specified by the Kafka client with each request and is used to identify the client making the requests. ${group.id} zookeeper.session.timeout.ms This property defines the time (in milliseconds) for a Kafka consumer to wait for a Zookeeper pulse before it is declared dead and rebalance is initiated. 6000 zookeeper.connection.timeout.ms This value defines the maximum waiting time (in milliseconds) for the client to establish a connection with ZooKeeper. 6000 zookeeper.sync.time.ms This property defines the time it takes to sync a Zookeeper follower with the Zookeeper leader (in milliseconds). 2000 auto.commit.enable This property enables a periodical commit of message offsets to the Zookeeper that are already fetched by the consumer. In the event of consumer failures, these committed offsets are used as a starting position by the new consumers. true auto.commit.interval.ms This property defines the frequency (in milliseconds) for the consumed offsets to get committed to ZooKeeper. 60 * 1000 auto.offset.reset This property defines the offset value if an initial offset is available in Zookeeper or the offset is out of range. Possible values are: largest: reset to largest offset smallest: reset to smallest offset anything else: throw an exception largest consumer.timeout.ms This property throws an exception to the consumer if no message is available for consumption after the specified interval. -1 Summary In this article, we have learned how to write basic consumers and learned about some advanced levels of Java consumers that consume messages from partitions. Resources for Article: Further resources on this subject: Introducing Kafka? [article] Introduction To Apache Zookeeper [article] Creating Apache Jmeter™ Test Workbench [article]
Read more
  • 0
  • 0
  • 2825
Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at €14.99/month. Cancel anytime
article-image-deployment-scenarios
Packt
04 Mar 2015
10 min read
Save for later

Deployment Scenarios

Packt
04 Mar 2015
10 min read
In this article by Andrea Gazzarini, author of the book Apache Solr Essentials, contains information on the various ways in which you can deploy Solr, including key features and pros and cons for each scenario. Solr has a wide range of deployment alternatives, from monolithic to distributed indexes and standalone to clustered instances. We will organize this article by deployment scenarios, with a growing level of complexity. This article will cover the following topics: Sharding Replication: master, slave, and repeaters (For more resources related to this topic, see here.) Standalone instance All the examples use a standalone instance of Solr, that is, one or more cores managed by a Solr deployment hosted in a standalone servlet container (for example, Jetty, Tomcat, and so on). This kind of deployment is useful for development because, as you learned, it is very easy to start and debug. Besides, it can also be suitable for a production context if you don't have strict non-functional requirements and have a small or medium amount of data. I have used a standalone instance to provide autocomplete services for small and medium intranet systems. Anyway, the main features of this kind of deployment are simplicity and maintainability; one simple node acts as both an indexer and a searcher. The following diagram depicts a standalone instance with two cores: Shards When a monolithic index becomes too large for a single node or when additions, deletions, or queries take too long to execute, the index can be split into multiple pieces called shards. The previous sentence highlights a logical and theoretical evolution path of a Solr index. However, this (in general) is valid for all scenarios we will describe. It is strongly recommended that you perform a preliminary analysis of your data and the estimated growth factor in order to decide from the beginning the right configuration that suits your requirements. Although it is possible to split an existing index into shards (https://lucene.apache.org/core/4_10_3/misc/org/apache/lucene/index/PKIndexSplitter.html), things definitely become easier if you start directly with a distributed index (if you need it, of course). The index is split vertically so that each shard contains a disjoint set of the entire index. Solr will query and merge results across those shards. The following diagram illustrates a Solr deployment with 3 nodes; this deployment consists of two cores (C1 and C2) divided into three shards (S1, S2, and S3): When using shards, only query requests are distributed. This means that it's up to the indexer to add and distribute the data across nodes, and to subsequently forward a change request (that is, delete, replace, and commit) for a given document to the appropriate shard (the shard that owns the document). The Solr Wiki recommends a simple, hash-based algorithm to determine the shard where a given document should be indexed: documentId.hashCode() % numServers Using this approach is also useful in order to know in advance where to send delete or update requests for a given document. On the opposite side, a searcher client will send a query request to any node, but it has to specify an additional shards parameter that declares the target shards that will be queried. In the following example, assuming that two shards are hosted in two servers listening to ports 8080 and 8081, the same request when sent to both nodes will produce the same result: http://localhost:8080/solr/c1/query?q=*:*&shards=localhost:8080/solr/c1,localhost:8081/solr/c2 http://localhost:8081/solr/c2/query?q=*:*&shards=localhost:8080/solr/c1,localhost:8081/solr/c2 When sending a query request, a client can optionally include a pseudofield associated with the [shard] transformer. In this case, as a part of each returned document, there will be additional information indicating the owning shard. This is an example of such a request: http://localhost:8080/solr/c1/query?q=*:*&shards=localhost:8080/solr/c1,localhost:8081/solr/c2&src_shard:[shard] Here is the corresponding response (note the pseudofield aliased as src_shard): <result name="response" numFound="192" start="0"> <doc>    <str name="id">9920</str>    <str name="brand">Fender</str>    <str name="model">Jazz Bass</str>    <arr name="artist">    <str>Marcus Miller</str>    </arr><str name="series">Marcus Miller signature</str>    <str name="src_shard">localhost:8080/solr/shard1</str> </doc> … <doc>    <str name="id">4392</str>    <str name="brand">Music Man</str>    <str name="model">Sting Ray</str>    <arr name="artist"><str>Tony Levin</str></arr>    <str name="series">5 strings DeLuxe</str>    <str name="src_shard">localhost:8081/solr/shard2</str> </doc> </result> The following are a few things to keep in mind when using this deployment scenario: The schema must have a uniqueKey field. This field must be declared as stored and indexed; in addition, it is supposed to be unique across all shards. Inverse Document Frequency (IDF) calculations cannot be distributed. IDF is computed per shard. Joins between documents belonging to different shards are not supported. If a shard receives both index and query requests, the index may change during a query execution, thus compromising the outgoing results (for example, a matching document that has been deleted). Master/slaves scenario In a master/slaves scenario, there are two types of Solr servers: an indexer (the master) and one or more searchers (the slaves). The master is the server that manages the index. It receives update requests and applies those changes. A searcher, on the other hand, is a Solr server that exposes search services to external clients. The index, in terms of data files, is replicated from the indexer to the searcher through HTTP by means of a built-in RequestHandler that must be configured on both the indexer side and searcher side (within the solrconfig.xml configuration file). On the indexer (master), a replication configuration looks like this: <requestHandler    name="/replication"  class="solr.ReplicationHandler">    <lst name="master">      <str name="replicateAfter">startup</str>      <str name="replicateAfter">optimize</str>      <str name="confFiles">schema.xml,stopwords.txt</str>    </lst> </requestHandler> The replication mechanism can be configured to be triggered after one of the following events: Commit: A commit has been applied Optimize: The index has been optimized Startup: The Solr instance has started In the preceding example, we want the index to be replicated after startup and optimize commands. Using the confFiles parameter, we can also indicate a set of configuration files (schema.xml and stopwords.txt, in the example) that must be replicated together with the index. Remember that changes on those files don't trigger any replication. Only a change in the index, in conjunction with one of the events we defined in the replicateAfter parameter, will mark the index (and the configuration files) as replicable. On the searcher side, the configuration looks like the following: <requestHandler name="/replication" class="solr.ReplicationHandler"> <lst name="slave">    <str name="masterUrl">http://<localhost>:<port>/solrmaster</str>    <str name="pollInterval">00:00:10</str> </lst> </requestHandler> You can see that a searcher periodically keeps polling the master (the pollInterval parameter) to check whether a newer version of the index is available. If it is, the searcher will start the replication mechanism by issuing a request to the master, which is completely unaware of the searchers. The replicability status of the index is actually indicated by a version number. If the searcher has the same version as the master, it means the index is the same. If the versions are different, it means that a newer version of the index is available on the master, and replication can start. Other than separating responsibilities, this deployment configuration allows us to have a so-called diamond architecture, consisting of one indexer and several searchers. When the replication is triggered, each searcher in the ring will receive a whole copy of the index. This allows the following: Load balancing of the incoming (query) requests. An increment to the availability of the whole system. In the event of a server crash, the other searchers will continue to serve the incoming requests. The following diagram illustrates a master/slave deployment scenario with one indexer, three searchers, and two cores: If the searchers are in several geographically dislocated data centers, an additional role called repeater can be configured in each data center in order to rationalize the replication data traffic flow between nodes. A repeater is simply a node that acts as both a master and a slave. It is a slave of the main master, and at the same time, it acts as master of the searchers within the same data center, as shown in this diagram: Shards with replication This scenario combines shards and replication in order to have a scalable system with high throughput and availability. There is one indexer and one or more searchers for each shard, allowing load balancing between (query) shard requests. The following diagram illustrates a scenario with two cores, three shards, one indexer, and (due to problems with available space), only one searcher for each shard: The drawback of this approach is undoubtedly the overall growing complexity of the system that requires more effort in terms of maintainability, manageability, and system administration. In addition to this, each searcher is an independent node, and we don't have a central administration console where a system administrator can get a quick overview of system health. Summary In this article, we described various ways in which you can deploy Solr. Each deployment scenario has specific features, advantages, and drawbacks that make a choice ideal for one context and bad for another. A good thing is that the different scenarios are not strictly exclusive; they follow an incremental approach. In an ideal context, things should start immediately with the perfect scenario that fits your needs. However, unless your requirements are clear right from the start, you can begin with a simple configuration and then change it, depending on how your application evolves. Resources for Article: Further resources on this subject: Tuning Solr JVM and Container [article] Boost Your search [article] In the Cloud [article]
Read more
  • 0
  • 0
  • 1074

article-image-elasticsearch-administration
Packt
03 Mar 2015
28 min read
Save for later

Elasticsearch Administration

Packt
03 Mar 2015
28 min read
In this article by Rafał Kuć and Marek Rogoziński, author of the book Mastering Elasticsearch, Second Edition we will talk more about the Elasticsearch configuration and new features introduced in Elasticsearch 1.0 and higher. By the end of this article, you will have learned: (For more resources related to this topic, see here.) Configuring the discovery and recovery modules Using the Cat API that allows a human-readable insight into the cluster status The backup and restore functionality Federated search Discovery and recovery modules When starting your Elasticsearch node, one of the first things that Elasticsearch does is look for a master node that has the same cluster name and is visible in the network. If a master node is found, the starting node gets joined into an already formed cluster. If no master is found, then the node itself is selected as a master (of course, if the configuration allows such behavior). The process of forming a cluster and finding nodes is called discovery. The module responsible for discovery has two main purposes—electing a master and discovering new nodes within a cluster. After the cluster is formed, a process called recovery is started. During the recovery process, Elasticsearch reads the metadata and the indices from the gateway, and prepares the shards that are stored there to be used. After the recovery of the primary shards is done, Elasticsearch should be ready for work and should continue with the recovery of all the replicas (if they are present). In this section, we will take a deeper look at these two modules and discuss the possibilities of configuration Elasticsearch gives us and what the consequences of changing them are. Note that the information provided in the Discovery and recovery modules section is an extension of what we already wrote in Elasticsearch Server Second Edition, published by Packt Publishing. Discovery configuration As we have already mentioned multiple times, Elasticsearch was designed to work in a distributed environment. This is the main difference when comparing Elasticsearch to other open source search and analytics solutions available. With such assumptions, Elasticsearch is very easy to set up in a distributed environment, and we are not forced to set up additional software to make it work like this. By default, Elasticsearch assumes that the cluster is automatically formed by the nodes that declare the same cluster.name setting and can communicate with each other using multicast requests. This allows us to have several independent clusters in the same network. There are a few implementations of the discovery module that we can use, so let's see what the options are. Zen discovery Zen discovery is the default mechanism that's responsible for discovery in Elasticsearch and is available by default. The default Zen discovery configuration uses multicast to find other nodes. This is a very convenient solution: just start a new Elasticsearch node and everything works—this node will be joined to the cluster if it has the same cluster name and is visible by other nodes in that cluster. This discovery method is perfectly suited for development time, because you don't need to care about the configuration; however, it is not advised that you use it in production environments. Relying only on the cluster name is handy but can also lead to potential problems and mistakes, such as the accidental joining of nodes. Sometimes, multicast is not available for various reasons or you don't want to use it for these mentioned reasons. In the case of bigger clusters, the multicast discovery may generate too much unnecessary traffic, and this is another valid reason why it shouldn't be used for production. For these cases, Zen discovery allows us to use the unicast mode. When using the unicast Zen discovery, a node that is not a part of the cluster will send a ping request to all the addresses specified in the configuration. By doing this, it informs all the specified nodes that it is ready to be a part of the cluster and can be either joined to an existing cluster or can form a new one. Of course, after the node joins the cluster, it gets the cluster topology information, but the initial connection is only done to the specified list of hosts. Remember that even when using unicast Zen discovery, the Elasticsearch node still needs to have the same cluster name as the other nodes. If you want to know more about the differences between multicast and unicast ping methods, refer to these URLs: http://en.wikipedia.org/wiki/Multicast and http://en.wikipedia.org/wiki/Unicast. If you still want to learn about the configuration properties of multicast Zen discovery, let's look at them. Multicast Zen discovery configuration The multicast part of the Zen discovery module exposes the following settings: discovery.zen.ping.multicast.address (the default: all available interfaces): This is the interface used for the communication given as the address or interface name. discovery.zen.ping.multicast.port (the default: 54328): This port is used for communication. discovery.zen.ping.multicast.group (the default: 224.2.2.4): This is the multicast address to send messages to. discovery.zen.ping.multicast.buffer_size (the default: 2048): This is the size of the buffer used for multicast messages. discovery.zen.ping.multicast.ttl (the default: 3): This is the time for which a multicast message lives. Every time a packet crosses the router, the TTL is decreased. This allows for the limiting area where the transmission can be received. Note that routers can have the threshold values assigned compared to TTL, which causes that TTL value to not match exactly the number of routers that a packet can jump over. discovery.zen.ping.multicast.enabled (the default: true): Setting this property to false turns off the multicast. You should disable multicast if you are planning to use the unicast discovery method. The unicast Zen discovery configuration The unicast part of Zen discovery provides the following configuration options: discovery.zen.ping.unicats.hosts: This is the initial list of nodes in the cluster. The list can be defined as a list or as an array of hosts. Every host can be given a name (or an IP address) or have a port or port range added. For example, the value of this property can look like this: ["master1", "master2:8181", "master3[80000-81000]"]. So, basically, the hosts' list for the unicast discovery doesn't need to be a complete list of Elasticsearch nodes in your cluster, because once the node is connected to one of the mentioned nodes, it will be informed about all the others that form the cluster. discovery.zen.ping.unicats.concurrent_connects (the default: 10): This is the maximum number of concurrent connections unicast discoveries will use. If you have a lot of nodes that the initial connection should be made to, it is advised that you increase the default value. Master node One of the main purposes of discovery apart from connecting to other nodes is to choose a master node—a node that will take care of and manage all the other nodes. This process is called master election and is a part of the discovery module. No matter how many master eligible nodes there are, each cluster will only have a single master node active at a given time. If there is more than one master eligible node present in the cluster, they can be elected as the master when the original master fails and is removed from the cluster. Configuring master and data nodes By default, Elasticsearch allows every node to be a master node and a data node. However, in certain situations, you may want to have worker nodes, which will only hold the data or process the queries and the master nodes that will only be used as cluster-managed nodes. One of these situations is to handle a massive amount of data, where data nodes should be as performant as possible, and there shouldn't be any delay in master nodes' responses. Configuring data-only nodes To set the node to only hold data, we need to instruct Elasticsearch that we don't want such a node to be a master node. In order to do this, we add the following properties to the elasticsearch.yml configuration file: node.master: falsenode.data: true Configuring master-only nodes To set the node not to hold data and only to be a master node, we need to instruct Elasticsearch that we don't want such a node to hold data. In order to do that, we add the following properties to the elasticsearch.yml configuration file: node.master: truenode.data: false Configuring the query processing-only nodes For large enough deployments, it is also wise to have nodes that are only responsible for aggregating query results from other nodes. Such nodes should be configured as nonmaster and nondata, so they should have the following properties in the elasticsearch.yml configuration file: node.master: falsenode.data: false Please note that the node.master and the node.data properties are set to true by default, but we tend to include them for configuration clarity. The master election configuration We already wrote about the master election configuration in Elasticsearch Server Second Edition, but this topic is very important, so we decided to refresh our knowledge about it. Imagine that you have a cluster that is built of 10 nodes. Everything is working fine until, one day, your network fails and three of your nodes are disconnected from the cluster, but they still see each other. Because of the Zen discovery and the master election process, the nodes that got disconnected elect a new master and you end up with two clusters with the same name with two master nodes. Such a situation is called a split-brain and you must avoid it as much as possible. When a split-brain happens, you end up with two (or more) clusters that won't join each other until the network (or any other) problems are fixed. If you index your data during this time, you may end up with data loss and unrecoverable situations when the nodes get joined together after the network split. In order to prevent split-brain situations or at least minimize the possibility of their occurrences, Elasticsearch provides a discovery.zen.minimum_master_nodes property. This property defines a minimum amount of master eligible nodes that should be connected to each other in order to form a cluster. So now, let's get back to our cluster; if we set the discovery.zen.minimum_master_nodes property to 50 percent of the total nodes available plus one (which is six, in our case), we would end up with a single cluster. Why is that? Before the network failure, we would have 10 nodes, which is more than six nodes, and these nodes would form a cluster. After the disconnections of the three nodes, we would still have the first cluster up and running. However, because only three nodes disconnected and three is less than six, these three nodes wouldn't be allowed to elect a new master and they would wait for reconnection with the original cluster. Zen discovery fault detection and configuration Elasticsearch runs two detection processes while it is working. The first process is to send ping requests from the current master node to all the other nodes in the cluster to check whether they are operational. The second process is a reverse of that—each of the nodes sends ping requests to the master in order to verify that it is still up and running and performing its duties. However, if we have a slow network or our nodes are in different hosting locations, the default configuration may not be sufficient. Because of this, the Elasticsearch discovery module exposes three properties that we can change: discovery.zen.fd.ping_interval: This defaults to 1s and specifies the interval of how often the node will send ping requests to the target node. discovery.zen.fd.ping_timeout: This defaults to 30s and specifies how long the node will wait for the sent ping request to be responded to. If your nodes are 100 percent utilized or your network is slow, you may consider increasing that property value. discovery.zen.fd.ping_retries: This defaults to 3 and specifies the number of ping request retries before the target node will be considered not operational. You can increase this value if your network has a high number of lost packets (or you can fix your network). There is one more thing that we would like to mention. The master node is the only node that can change the state of the cluster. To achieve a proper cluster state updates sequence, Elasticsearch master nodes process single cluster state update requests one at a time, make the changes locally, and send the request to all the other nodes so that they can synchronize their state. The master nodes wait for the given time for the nodes to respond, and if the time passes or all the nodes are returned, with the current acknowledgment information, it proceeds with the next cluster state update request processing. To change the time, the master node waits for all the other nodes to respond, and you should modify the default 30 seconds time by setting the discovery.zen.publish_timeout property. Increasing the value may be needed for huge clusters working in an overloaded network. The Amazon EC2 discovery Amazon, in addition to selling goods, has a few popular services such as selling storage or computing power in a pay-as-you-go model. So-called Amazon Elastic Compute Cloud (EC2) provides server instances and, of course, they can be used to install and run Elasticsearch clusters (among many other things, as these are normal Linux machines). This is convenient—you pay for instances that are needed in order to handle the current traffic or to speed up calculations, and you shut down unnecessary instances when the traffic is lower. Elasticsearch works well on EC2, but due to the nature of the environment, some features may work slightly differently. One of these features that works differently is discovery, because Amazon EC2 doesn't support multicast discovery. Of course, we can switch to unicast discovery, but sometimes, we want to be able to automatically discover nodes and, with unicast, we need to at least provide the initial list of hosts. However, there is an alternative—we can use the Amazon EC2 plugin, a plugin that combines the multicast and unicast discovery methods using the Amazon EC2 API. Make sure that during the set up of EC2 instances, you set up communication between them (on port 9200 and 9300 by default). This is crucial in order to have Elasticsearch nodes communicate with each other and, thus, cluster functioning is required. Of course, this communication depends on network.bind_host and network.publish_host (or network.host) settings. The EC2 plugin installation The installation of a plugin is as simple as with most of the plugins. In order to install it, we should run the following command: bin/plugin install elasticsearch/elasticsearch-cloud-aws/2.4.0 The EC2 plugin's generic configuration This plugin provides several configuration settings that we need to provide in order for the EC2 discovery to work: cluster.aws.access_key: Amazon access key—one of the credential values you can find in the Amazon configuration panel cluster.aws.secret_key: Amazon secret key—similar to the previously mentioned access_key setting, it can be found in the EC2 configuration panel The last thing is to inform Elasticsearch that we want to use a new discovery type by setting the discovery.type property to ec2 value and turn off multicast. Optional EC2 discovery configuration options The previously mentioned settings are sufficient to run the EC2 discovery, but in order to control the EC2 discovery plugin behavior, Elasticsearch exposes additional settings: cloud.aws.region: This region will be used to connect with Amazon EC2 web services. You can choose a region that's adequate for the region where your instance resides, for example, eu-west-1 for Ireland. The possible values can be eu-west, sa-east, us-east, us-west-1, us-west-2, ap-southeast-1, and ap-southeast-1. cloud.aws.ec2.endpoint: If you are using EC2 API services, instead of defining a region, you can provide an address of the AWS endpoint, for example, ec2.eu-west-1.amazonaws.com. cloud.aws.protocol: This is the protocol that should be used by the plugin to connect to Amazon Web Services endpoints. By default, Elasticsearch will use the HTTPS protocol (which means setting the value of the property to https). We can also change this behavior and set the property to http for the plugin to use HTTP without encryption. We are also allowed to overwrite the cloud.aws.protocol settings for each service by using the cloud.aws.ec2.protocol and cloud.aws.s3.protocol properties (the possible values are the same—https and http). cloud.aws.proxy_host: Elasticsearch allows us to define a proxy that will be used to connect to AWS endpoints. The cloud.aws.proxy_host property should be set to the address to the proxy that should be used. cloud.aws.proxy_port: The second property related to the AWS endpoints proxy allows us to specify the port on which the proxy is listening. The cloud.aws.proxy_port property should be set to the port on which the proxy listens. discovery.ec2.ping_timeout (the default: 3s): This is the time to wait for the response for the ping message sent to the other node. After this time, the nonresponsive node will be considered dead and removed from the cluster. Increasing this value makes sense when dealing with network issues or we have a lot of EC2 nodes. The EC2 nodes scanning configuration The last group of settings we want to mention allows us to configure a very important thing when building cluster working inside the EC2 environment—the ability to filter available Elasticsearch nodes in our Amazon Elastic Cloud Computing network. The Elasticsearch EC2 plugin exposes the following properties that can help us configure its behavior: discovery.ec2.host_type: This allows us to choose the host type that will be used to communicate with other nodes in the cluster. The values we can use are private_ip (the default one; the private IP address will be used for communication), public_ip (the public IP address will be used for communication), private_dns (the private hostname will be used for communication), and public_dns (the public hostname will be used for communication). discovery.ec2.groups: This is a comma-separated list of security groups. Only nodes that fall within these groups can be discovered and included in the cluster. discovery.ec2.availability_zones: This is array or command-separated list of availability zones. Only nodes with the specified availability zones will be discovered and included in the cluster. discovery.ec2.any_group (this defaults to true): Setting this property to false will force the EC2 discovery plugin to discover only those nodes that reside in an Amazon instance that falls into all of the defined security groups. The default value requires only a single group to be matched. discovery.ec2.tag: This is a prefix for a group of EC2-related settings. When you launch your Amazon EC2 instances, you can define tags, which can describe the purpose of the instance, such as the customer name or environment type. Then, you use these defined settings to limit discovery nodes. Let's say you define a tag named environment with a value of qa. In the configuration, you can now specify the following: discovery.ec2.tag.environment: qa and only nodes running on instances with this tag will be considered for discovery. cloud.node.auto_attributes: When this is set to true, Elasticsearch will add EC2-related node attributes (such as the availability zone or group) to the node properties and will allow us to use them, adjusting the Elasticsearch shard allocation and configuring the shard placement. Other discovery implementations The Zen discovery and EC2 discovery are not the only discovery types that are available. There are two more discovery types that are developed and maintained by the Elasticsearch team, and these are: Azure discovery: https://github.com/elasticsearch/elasticsearch-cloud-azure Google Compute Engine discovery: https://github.com/elasticsearch/elasticsearch-cloud-gce In addition to these, there are a few discovery implementations provided by the community, such as the ZooKeeper discovery for older versions of Elasticsearch (https://github.com/sonian/elasticsearch-zookeeper). The gateway and recovery configuration The gateway module allows us to store all the data that is needed for Elasticsearch to work properly. This means that not only is the data in Apache Lucene indices stored, but also all the metadata (for example, index allocation settings), along with the mappings configuration for each index. Whenever the cluster state is changed, for example, when the allocation properties are changed, the cluster state will be persisted by using the gateway module. When the cluster is started up, its state will be loaded using the gateway module and applied. One should remember that when configuring different nodes and different gateway types, indices will use the gateway type configuration present on the given node. If an index state should not be stored using the gateway module, one should explicitly set the index gateway type to none. The gateway recovery process Let's say explicitly that the recovery process is used by Elasticsearch to load the data stored with the use of the gateway module in order for Elasticsearch to work. Whenever a full cluster restart occurs, the gateway process kicks in to load all the relevant information we've mentioned—the metadata, the mappings, and of course, all the indices. When the recovery process starts, the primary shards are initialized first, and then, depending on the replica state, they are initialized using the gateway data, or the data is copied from the primary shards if the replicas are out of sync. Elasticsearch allows us to configure when the cluster data should be recovered using the gateway module. We can tell Elasticsearch to wait for a certain number of master eligible or data nodes to be present in the cluster before starting the recovery process. However, one should remember that when the cluster is not recovered, all the operations performed on it will not be allowed. This is done in order to avoid modification conflicts. Configuration properties Before we continue with the configuration, we would like to say one more thing. As you know, Elasticsearch nodes can play different roles—they can have a role of data nodes—the ones that hold data—they can have a master role, or they can be only used for request handing, which means not holding data and not being master eligible. Remembering all this, let's now look at the gateway configuration properties that we are allowed to modify: gateway.recover_after_nodes: This is an integer number that specifies how many nodes should be present in the cluster for the recovery to happen. For example, when set to 5, at least 5 nodes (doesn't matter whether they are data or master eligible nodes) must be present for the recovery process to start. gateway.recover_after_data_nodes: This is an integer number that allows us to set how many data nodes should be present in the cluster for the recovery process to start. gateway.recover_after_master_nodes: This is another gateway configuration option that allows us to set how many master eligible nodes should be present in the cluster for the recovery to start. gateway.recover_after_time: This allows us to set how much time to wait before the recovery process starts after the conditions defined by the preceding properties are met. If we set this property to 5m, we tell Elasticsearch to start the recovery process 5 minutes after all the defined conditions are met. The default value for this property is 5m, starting from Elasticsearch 1.3.0. Let's imagine that we have six nodes in our cluster, out of which four are data eligible. We also have an index that is built of three shards, which are spread across the cluster. The last two nodes are master eligible and they don't hold the data. What we would like to configure is the recovery process to be delayed for 3 minutes after the four data nodes are present. Our gateway configuration could look like this: gateway.recover_after_data_nodes: 4gateway.recover_after_time: 3m Expectations on nodes In addition to the already mentioned properties, we can also specify properties that will force the recovery process of Elasticsearch. These properties are: gateway.expected_nodes: This is the number of nodes expected to be present in the cluster for the recovery to start immediately. If you don't need the recovery to be delayed, it is advised that you set this property to the number of nodes (or at least most of them) with which the cluster will be formed from, because that will guarantee that the latest cluster state will be recovered. gateway.expected_data_nodes: This is the number of expected data eligible nodes to be present in the cluster for the recovery process to start immediately. gateway.expected_master_nodes: This is the number of expected master eligible nodes to be present in the cluster for the recovery process to start immediately. Now, let's get back to our previous example. We know that when all six nodes are connected and are in the cluster, we want the recovery to start. So, in addition to the preceeding configuration, we would add the following property: gateway.expected_nodes: 6 So the whole configuration would look like this: gateway.recover_after_data_nodes: 4gateway.recover_after_time: 3mgateway.expected_nodes: 6 The preceding configuration says that the recovery process will be delayed for 3 minutes once four data nodes join the cluster and will begin immediately after six nodes are in the cluster (doesn't matter whether they are data nodes or master eligible nodes). The local gateway With the release of Elasticsearch 0.20 (and some of the releases from 0.19 versions), all the gateway types, apart from the default local gateway type, were deprecated. It is advised that you do not use them, because they will be removed in future versions of Elasticsearch. This is still not the case, but if you want to avoid full data reindexation, you should only use the local gateway type, and this is why we won't discuss all the other types. The local gateway type uses a local storage available on a node to store the metadata, mappings, and indices. In order to use this gateway type and the local storage available on the node, there needs to be enough disk space to hold the data with no memory caching. The persistence to the local gateway is different from the other gateways that are currently present (but deprecated). The writes to this gateway are done in a synchronous manner in order to ensure that no data will be lost during the write process. In order to set the type of gateway that should be used, one should use the gateway.type property, which is set to local by default. There is one additional thing regarding the local gateway of Elasticsearch that we didn't talk about—dangling indices. When a node joins a cluster, all the shards and indices that are present on the node, but are not present in the cluster, will be included in the cluster state. Such indices are called dangling indices, and we are allowed to choose how Elasticsearch should treat them. Elasticsearch exposes the gateway.local.auto_import_dangling property, which can take the value of yes (the default value that results in importing all dangling indices into the cluster), close (results in importing the dangling indices into the cluster state but keeps them closed by default), and no (results in removing the dangling indices). When setting the gateway.local.auto_import_dangling property to no, we can also set the gateway.local.dangling_timeout property (defaults to 2h) to specify how long Elasticsearch will wait while deleting the dangling indices. The dangling indices feature can be nice when we restart old Elasticsearch nodes, and we don't want old indices to be included in the cluster. Low-level recovery configuration We discussed that we can use the gateway to configure the behavior of the Elasticsearch recovery process, but in addition to that, Elasticsearch allows us to configure the recovery process itself. However, we decided that it would be good to mention the properties we can use in the section dedicated to gateway and recovery. Cluster- level recovery configuration The recovery configuration is specified mostly on the cluster level and allows us to set general rules for the recovery module to work with. These settings are: indices.recovery.concurrent_streams: This defaults to 3 and specifies the number of concurrent streams that are allowed to be opened in order to recover a shard from its source. The higher the value of this property, the more pressure will be put on the networking layer; however, the recovery may be faster, depending on your network usage and throughput. indices.recovery.max_bytes_per_sec: By default, this is set to 20MB and specifies the maximum number of data that can be transferred during shard recovery per second. In order to disable data transfer limiting, one should set this property to 0. Similar to the number of concurrent streams, this property allows us to control the network usage of the recovery process. Setting this property to higher values may result in higher network utilization and a faster recovery process. indices.recovery.compress: This is set to true by default and allows us to define whether ElasticSearch should compress the data that is transferred during the recovery process. Setting this to false may lower the pressure on the CPU, but it will also result in more data being transferred over the network. indices.recovery.file_chunk_size: This is the chunk size used to copy the shard data from the source shard. By default, it is set to 512KB and is compressed if the indices.recovery.compress property is set to true. indices.recovery.translog_ops: This defaults to 1000 and specifies how many transaction log lines should be transferred between shards in a single request during the recovery process. indices.recovery.translog_size: This is the chunk size used to copy the shard transaction log data from the source shard. By default, it is set to 512KB and is compressed if the indices.recovery.compress property is set to true. In the versions prior to Elasticsearch 0.90.0, there was the indices.recovery.max_size_per_sec property that could be used, but it was deprecated, and it is suggested that you use the indices.recovery.max_bytes_per_sec property instead. However, if you are using an Elasticsearch version older than 0.90.0, it may be worth remembering this. All the previously mentioned settings can be updated using the Cluster Update API, or they can be set in the elasticsearch.yml file. Index-level recovery settings In addition to the values mentioned previously, there is a single property that can be set on a per-index basis. The property can be set both in the elasticsearch.yml file and using the indices Update Settings API, and it is called index.recovery.initial_shards. In general, Elasticsearch will only recover a particular shard when there is a quorum of shards present and if that quorum can be allocated. A quorum is 50 percent of the shards for the given index plus one. By using the index.recovery.initial_shards property, we can change what Elasticsearch will take as a quorum. This property can be set to the one of the following values: quorum: 50 percent, plus one shard needs to be present and be allocable. This is the default value. quorum-1: 50 percent of the shards for a given index need to be present and be allocable. full: All of the shards for the given index need to be present and be allocable. full-1: 100 percent minus one shards for the given index need to be present and be allocable. integer value: Any integer such as 1, 2, or 5 specifies the number of shards that are needed to be present and that can be allocated. For example, setting this value to 2 will mean that at least two shards need to be present and Elasticsearch needs at least 2 shards to be allocable. It is good to know about this property, but in most cases, the default value will be sufficient for most deployments. Summary In this article, we focused more on the Elasticsearch configuration and new features that were introduced in Elasticsearch 1.0. We configured discovery and recovery, and we used the human-friendly Cat API. In addition to that, we used the backup and restore functionality, which allowed easy backup and recovery of our indices. Finally, we looked at what federated search is and how to search and index data to multiple clusters, while still using all the functionalities of Elasticsearch and being connected to a single node. If you want to dig deeper, buy the book Mastering Elasticsearch, Second Edition and read in a simple step-by-step fashion using Elasticsearch to enhance your knowlege further. Resources for Article: Further resources on this subject: Downloading and Setting Up ElasticSearch [Article] Indexing the Data [Article] Driving Visual Analyses with Automobile Data (Python) [Article]
Read more
  • 0
  • 0
  • 2596

article-image-scipy-signal-processing
Packt
03 Mar 2015
14 min read
Save for later

SciPy for Signal Processing

Packt
03 Mar 2015
14 min read
In this article by Sergio J. Rojas G. and Erik A Christensen, authors of the book Learning SciPy for Numerical and Scientific Computing - Second Edition, we will focus on the usage of some most commonly used routines that are included in SciPy modules—scipy.signal, scipy.ndimage, and scipy.fftpack, which are used for signal processing, multidimensional image processing, and computing Fourier transforms, respectively. We define a signal as data that measures either a time-varying or spatially varying phenomena. Sound or electrocardiograms are excellent examples of time-varying quantities, while images embody the quintessential spatially varying cases. Moving images are treated with the techniques of both types of signals, obviously. The field of signal processing treats four aspects of this kind of data: its acquisition, quality improvement, compression, and feature extraction. SciPy has many routines to treat effectively tasks in any of the four fields. All these are included in two low-level modules (scipy.signal being the main module, with an emphasis on time-varying data, and scipy.ndimage, for images). Many of the routines in these two modules are based on Discrete Fourier Transform of the data. SciPy has an extensive package of applications and definitions of these background algorithms, scipy.fftpack, which we will start covering first. (For more resources related to this topic, see here.) Discrete Fourier Transforms The Discrete Fourier Transform (DFT from now on) transforms any signal from its time/space domain into a related signal in the frequency domain. This allows us not only to be able to analyze the different frequencies of the data, but also for faster filtering operations, when used properly. It is possible to turn a signal in the frequency domain back to its time/spatial domain; thanks to the Inverse Fourier Transform. We will not go into detail of the mathematics behind these operators, since we assume familiarity at some level with this theory. We will focus on syntax and applications instead. The basic routines in the scipy.fftpack module compute the DFT and its inverse, for discrete signals in any dimension, which are fft and ifft (one dimension), fft2 and ifft2 (two dimensions), and fftn and ifftn (any number of dimensions). All of these routines assume that the data is complex valued. If we know beforehand that a particular dataset is actually real valued, and should offer real-valued frequencies, we use rfft and irfft instead, for a faster algorithm. All these routines are designed so that composition with their inverses always yields the identity. The syntax is the same in all cases, as follows: fft(x[, n, axis, overwrite_x]) The first parameter, x, is always the signal in any array-like form. Note that fft performs one-dimensional transforms. This means in particular, that if x happens to be two-dimensional, for example, fft will output another two-dimensional array where each row is the transform of each row of the original. We can change it to columns instead, with the optional parameter, axis. The rest of parameters are also optional; n indicates the length of the transform, and overwrite_x gets rid of the original data to save memory and resources. We usually play with the integer n when we need to pad the signal with zeros, or truncate it. For higher dimension, n is substituted by shape (a tuple), and axis by axes (another tuple). To better understand the output, it is often useful to shift the zero frequencies to the center of the output arrays with fftshift. The inverse of this operation, ifftshift, is also included in the module. The following code shows some of these routines in action, when applied to a checkerboard image: >>> import numpy >>> from scipy.fftpack import fft,fft2, fftshift >>> import matplotlib.pyplot as plt >>> B=numpy.ones((4,4)); W=numpy.zeros((4,4)) >>> signal = numpy.bmat("B,W;W,B") >>> onedimfft = fft(signal,n=16) >>> twodimfft = fft2(signal,shape=(16,16)) >>> plt.figure() >>> plt.gray() >>> plt.subplot(121,aspect='equal') >>> plt.pcolormesh(onedimfft.real) >>> plt.colorbar(orientation='horizontal') >>> plt.subplot(122,aspect='equal') >>> plt.pcolormesh(fftshift(twodimfft.real)) >>> plt.colorbar(orientation='horizontal') >>> plt.show() Note how the first four rows of the one-dimensional transform are equal (and so are the last four), while the two-dimensional transform (once shifted) presents a peak at the origin, and nice symmetries in the frequency domain. In the following screenshot (obtained from the preceding code), the left-hand side image is fft and the right-hand side image is fft2 of a 2 x 2 checkerboard signal: The scipy.fftpack module also offers the Discrete Cosine Transform with its inverse (dct, idct) as well as many differential and pseudo-differential operators defined in terms of all these transforms: diff (for derivative/integral), hilbert and ihilbert (for the Hilbert transform), tilbert and itilbert (for the h-Tilbert transform of periodic sequences), and so on. Signal construction To aid in the construction of signals with predetermined properties, the scipy.signal module has a nice collection of the most frequent one-dimensional waveforms in the literature: chirp and sweep_poly (for the frequency-swept cosine generator), gausspulse (a Gaussian modulated sinusoid) and sawtooth and square (for the waveforms with those names). They all take as their main parameter a one-dimensional ndarray representing the times at which the signal is to be evaluated. Other parameters control the design of the signal, according to frequency or time constraints. Let's take a look into the following code snippet, which illustrates the use of these one dimensional waveforms that we just discussed: >>> import numpy >>> from scipy.signal import chirp, sawtooth, square, gausspulse >>> import matplotlib.pyplot as plt >>> t=numpy.linspace(-1,1,1000) >>> plt.subplot(221); plt.ylim([-2,2]) >>> plt.plot(t,chirp(t,f0=100,t1=0.5,f1=200))   # plot a chirp >>> plt.subplot(222); plt.ylim([-2,2]) >>> plt.plot(t,gausspulse(t,fc=10,bw=0.5))     # Gauss pulse >>> plt.subplot(223); plt.ylim([-2,2]) >>> t*=3*numpy.pi >>> plt.plot(t,sawtooth(t))                     # sawtooth >>> plt.subplot(224); plt.ylim([-2,2]) >>> plt.plot(t,square(t))                       # Square wave >>> plt.show() Generated by this code, the following diagram shows waveforms for chirp (upper-left), gausspulse (upper-right), sawtooth (lower-left), and square (lower-right): The usual method of creating signals is to import them from the file. This is possible by using purely NumPy routines, for example fromfile: fromfile(file, dtype=float, count=-1, sep='') The file argument may point to either a file or a string, the count argument is used to determine the number of items to read, and sep indicates what constitutes a separator in the original file/string. For images, we have the versatile routine, imread in either the scipy.ndimage or scipy.misc module: imread(fname, flatten=False) The fname argument is a string containing the location of an image. The routine infers the type of file, and reads the data into an array, accordingly. In case the flatten argument is turned to True, the image is converted to gray scale. Note that, in order to work, the Python Imaging Library (PIL) needs to be installed. It is also possible to load .wav files for analysis, with the read and write routines from the wavfile submodule in the scipy.io module. For instance, given any audio file with this format, say audio.wav, the command, rate,data = scipy.io.wavfile.read("audio.wav"), assigns an integer value to the rate variable, indicating the sample rate of the file (in samples per second), and a NumPy ndarray to the data variable, containing the numerical values assigned to the different notes. If we wish to write some one-dimensional ndarray data into an audio file of this kind, with the sample rate given by the rate variable, we may do so by issuing the following command: >>> scipy.io.wavfile.write("filename.wav",rate,data) Filters A filter is an operation on signals that either removes features or extracts some component. SciPy has a very complete set of known filters, as well as the tools to allow construction of new ones. The complete list of filters in SciPy is long, and we encourage the reader to explore the help documents of the scipy.signal and scipy.ndimage modules for the complete picture. We will introduce in these pages, as an exposition, some of the most used filters in the treatment of audio or image processing. We start by creating a signal worth filtering: >>> from numpy import sin, cos, pi, linspace >>> f=lambda t: cos(pi*t) + 0.2*sin(5*pi*t+0.1) + 0.2*sin(30*pi*t)    + 0.1*sin(32*pi*t+0.1) + 0.1*sin(47* pi*t+0.8) >>> t=linspace(0,4,400); signal=f(t) We first test the classical smoothing filter of Wiener and Kolmogorov, wiener. We present in a plot, the original signal (in black) and the corresponding filtered data, with a choice of a Wiener window of the size 55 samples (in blue). Next, we compare the result of applying the median filter, medfilt, with a kernel of the same size as before (in red): >>> from scipy.signal import wiener, medfilt >>> import matplotlib.pylab as plt >>> plt.plot(t,signal,'k') >>> plt.plot(t,wiener(signal,mysize=55),'r',linewidth=3) >>> plt.plot(t,medfilt(signal,kernel_size=55),'b',linewidth=3) >>> plt.show() This gives us the following graph showing the comparison of smoothing filters (wiener is the one that has its starting point just below 0.5 and medfilt has its starting point just above 0.5): Most of the filters in the scipy.signal module can be adapted to work in arrays of any dimension. But in the particular case of images, we prefer to use the implementations in the scipy.ndimage module, since they are coded with these objects in mind. For instance, to perform a median filter on an image for smoothing, we use scipy.ndimage.median_filter. Let's see an example. We will start by loading Lena to the array and corrupting the image with Gaussian noise (zero mean and standard deviation of 16): >>> from scipy.stats import norm     # Gaussian distribution >>> import matplotlib.pyplot as plt >>> import scipy.misc >>> import scipy.ndimage >>> plt.gray() >>> lena=scipy.misc.lena().astype(float) >>> plt.subplot(221); >>> plt.imshow(lena) >>> lena+=norm(loc=0,scale=16).rvs(lena.shape) >>> plt.subplot(222); >>> plt.imshow(lena) >>> denoised_lena = scipy.ndimage.median_filter(lena,3) >>> plt.subplot(224); >>> plt.imshow(denoised_lena) The set of filters for images come in two flavors—statistical and morphological. For example, among the filters of statistical nature, we have the Sobel algorithm oriented to detection of edges (singularities along curves). Its syntax is as follows: sobel(image, axis=-1, output=None, mode='reflect', cval=0.0) The optional parameter, axis, indicates the dimension in which the computations are performed. By default, this is always the last axis (-1). The mode parameter, which is one of the strings 'reflect', 'constant', 'nearest', 'mirror', or 'wrap', indicates how to handle the border of the image, in case there is insufficient data to perform the computations there. In case the mode is 'constant', we may indicate the value to use in the border, with the cval parameter. Let's look into the following code snippet, which illustrates the use of the sobel filter: >>> from scipy.ndimage.filters import sobel >>> import numpy >>> lena=scipy.misc.lena() >>> sblX=sobel(lena,axis=0); sblY=sobel(lena,axis=1) >>> sbl=numpy.hypot(sblX,sblY) >>> plt.subplot(223); >>> plt.imshow(sbl) >>> plt.show() The following screenshot illustrates Lena (upper-left) and noisy Lena (upper-right) with the preceding two filters in action—edge map with sobel (lower-left) and median filter (lower-right): Morphology We also have the possibility of creating and applying filters to images based on mathematical morphology, both to binary and gray-scale images. The four basic morphological operations are opening (binary_opening), closing (binary_closing), dilation (binary_dilation), and erosion (binary_erosion). Note that the syntax for each of these filters is very simple, since we only need two ingredients—the signal to filter and the structuring element to perform the morphological operation. Let's take a look into the general syntax for these morphological operations: binary_operation(signal, structuring_element) We may use combinations of these four basic morphological operations to create more complex filters for removal of holes, hit-or-miss transforms (to find the location of specific patterns in binary images), denoising, edge detection, and many more. The SciPy module also allows for creating some common filters using the preceding syntax. For instance, for the location of the letter e in a text, we could use the following command instead: >>> binary_hit_or_miss(text, letterE) For comparative purposes, let's use this command in the following code snippet: >>> import numpy >>> import scipy.ndimage >>> import matplotlib.pylab as plt >>> from scipy.ndimage.morphology import binary_hit_or_miss >>> text = scipy.ndimage.imread('CHAP_05_input_textImage.png') >>> letterE = text[37:53,275:291] >>> HitorMiss = binary_hit_or_miss(text, structure1=letterE,    origin1=1) >>> eLocation = numpy.where(HitorMiss==True) >>> x=eLocation[1]; y=eLocation[0] >>> plt.imshow(text, cmap=plt.cm.gray, interpolation='nearest') >>> plt.autoscale(False) >>> plt.plot(x,y,'wo',markersize=10) >>> plt.axis('off') >>> plt.show() The output for the preceding lines of code is generated as follows: For gray-scale images, we may use a structuring element (structuring_element) or a footprint. The syntax is, therefore, a little different: grey_operation(signal, [structuring_element, footprint, size, ...]) If we desire to use a completely flat and rectangular structuring element (all ones), then it is enough to indicate the size as a tuple. For instance, to perform gray-scale dilation of a flat element of size (15,15) on our classical image of Lena, we issue the following command: >>> grey_dilation(lena, size=(15,15)) The last kind of morphological operations coded in the scipy.ndimage module perform distance and feature transforms. Distance transforms create a map that assigns to each pixel, the distance to the nearest object. Feature transforms provide with the index of the closest background element instead. These operations are used to decompose images into different labels. We may even choose different metrics such as Euclidean distance, chessboard distance, and taxicab distance. The syntax for the distance transform (distance_transform) using a brute force algorithm is as follows: distance_transform_bf(signal, metric='euclidean', sampling=None, return_distances=True, return_indices=False,                      distances=None, indices=None) We indicate the metric with the strings such as 'euclidean', 'taxicab', or 'chessboard'. If we desire to provide the feature transform instead, we switch return_distances to False and return_indices to True. Similar routines are available with more sophisticated algorithms—distance_transform_cdt (using chamfering for taxicab and chessboard distances). For Euclidean distance, we also have distance_transform_edt. All these use the same syntax. Summary In this article, we explored signal processing (any dimensional) including the treatment of signals in frequency space, by means of their Discrete Fourier Transforms. These correspond to the fftpack, signal, and ndimage modules. Resources for Article: Further resources on this subject: Signal Processing Techniques [article] SciPy for Computational Geometry [article] Move Further with NumPy Modules [article]
Read more
  • 0
  • 0
  • 5786

article-image-performance-considerations
Packt
03 Mar 2015
13 min read
Save for later

Performance Considerations

Packt
03 Mar 2015
13 min read
In this article by Dayong Du, the author of Apache Hive Essentials, we will look at the different performance considerations when using Hive. Although Hive is built to deal with big data, we still cannot ignore the importance of performance. Most of the time, a better Hive query can rely on the smart query optimizer to find the best execution strategy as well as the default setting best practice from vendor packages. However, as experienced users, we should learn more about the theory and practice of performance tuning in Hive, especially when working in a performance-based project or environment. We will start from utilities available in Hive to find potential issues causing poor performance. Then, we introduce the best practices of performance considerations in the areas of queries and job. (For more resources related to this topic, see here.) Performance utilities Hive provides the EXPLAIN and ANALYZE statements that can be used as utilities to check and identify the performance of queries. The EXPLAIN statement Hive provides an EXPLAIN command to return a query execution plan without running the query. We can use an EXPLAIN command for queries if we have a doubt or a concern about performance. The EXPLAIN command will help to see the difference between two or more queries for the same purpose. The syntax for EXPLAIN is as follows: EXPLAIN [EXTENDED|DEPENDENCY|AUTHORIZATION] hive_query The following keywords can be used: EXTENDED: This provides additional information for the operators in the plan, such as file pathname and abstract syntax tree. DEPENDENCY: This provides a JSON format output that contains a list of tables and partitions that the query depends on. It is available since HIVE 0.10.0. AUTHORIZATION: This lists all entities needed to be authorized including input and output to run the Hive query and authorization failures, if any. It is available since HIVE 0.14.0. A typical query plan contains the following three sections. We will also have a look at an example later: Abstract syntax tree (AST): Hive uses a pacer generator called ANTLR (see http://www.antlr.org/) to automatically generate a tree of syntax for HQL. We can usually ignore this most of the time. Stage dependencies: This lists all dependencies and number of stages used to run the query. Stage plans: It contains important information, such as operators and sort orders, for running the job. The following is what a typical query plan looks like. From the following example, we can see that the AST section is not shown since the EXTENDED keyword is not used with EXPLAIN. In the STAGE DEPENDENCIES section, both Stage-0 and Stage-1 are independent root stages. In the STAGE PLANS section, Stage-1 has one map and reduce referred to by Map Operator Tree and Reduce Operator Tree. Inside each Map/Reduce Operator Tree section, all operators corresponding to Hive query keywords as well as expressions and aggregations are listed. The Stage-0 stage does not have map and reduce. It is just a Fetch operation. jdbc:hive2://> EXPLAIN SELECT sex_age.sex, count(*). . . . . . .> FROM employee_partitioned. . . . . . .> WHERE year=2014 GROUP BY sex_age.sex LIMIT 2;+-----------------------------------------------------------------------------+| Explain |+-----------------------------------------------------------------------------+| STAGE DEPENDENCIES: || Stage-1 is a root stage || Stage-0 is a root stage || || STAGE PLANS: || Stage: Stage-1 || Map Reduce || Map Operator Tree: || TableScan || alias: employee_partitioned || Statistics: Num rows: 0 Data size: 227 Basic stats:PARTIAL || Column stats: NONE || Select Operator || expressions: sex_age (type: struct<sex:string,age:int>) || outputColumnNames: sex_age || Statistics: Num rows: 0 Data size: 227 Basic stats:PARTIAL || Column stats: NONE || Group By Operator || aggregations: count() || keys: sex_age.sex (type: string) || mode: hash || outputColumnNames: _col0, _col1 || Statistics: Num rows: 0 Data size: 227 Basic stats:PARTIAL || Column stats: NONE || Reduce Output Operator || key expressions: _col0 (type: string) || sort order: + || Map-reduce partition columns: _col0 (type: string) || Statistics: Num rows: 0 Data size: 227 Basic stats:PARTIAL|| Column stats: NONE || value expressions: _col1 (type: bigint) || Reduce Operator Tree: || Group By Operator || aggregations: count(VALUE._col0) || keys: KEY._col0 (type: string) || mode: mergepartial || outputColumnNames: _col0, _col1 || Statistics: Num rows: 0 Data size: 0 Basic stats: NONE || Column stats: NONE || Select Operator || expressions: _col0 (type: string), _col1 (type: bigint) || outputColumnNames: _col0, _col1 || Statistics: Num rows: 0 Data size: 0 Basic stats: NONE || Column stats: NONE || Limit || Number of rows: 2 || Statistics: Num rows: 0 Data size: 0 Basic stats: NONE || Column stats: NONE || File Output Operator || compressed: false || Statistics: Num rows: 0 Data size: 0 Basic stats: NONE || Column stats: NONE || table: || input format: org.apache.hadoop.mapred.TextInputFormat || output format:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat|| serde:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe|| || Stage: Stage-0 || Fetch Operator || limit: 2 |+-----------------------------------------------------------------------------+53 rows selected (0.26 seconds) The ANALYZE statement Hive statistics are a collection of data that describe more details, such as the number of rows, number of files, and raw data size, on the objects in the Hive database. Statistics is a metadata of Hive data. Hive supports statistics at the table, partition, and column level. These statistics serve as an input to the Hive Cost-Based Optimizer (CBO), which is an optimizer to pick the query plan with the lowest cost in terms of system resources required to complete the query. The statistics are gathered through the ANALYZE statement since Hive 0.10.0 on tables, partitions, and columns as given in the following examples: jdbc:hive2://> ANALYZE TABLE employee COMPUTE STATISTICS;No rows affected (27.979 seconds)jdbc:hive2://> ANALYZE TABLE employee_partitioned. . . . . . .> PARTITION(year=2014, month=12) COMPUTE STATISTICS;No rows affected (45.054 seconds)jdbc:hive2://> ANALYZE TABLE employee_id COMPUTE STATISTICS. . . . . . .> FOR COLUMNS employee_id;No rows affected (41.074 seconds) Once the statistics are built, we can check the statistics by the DESCRIBE EXTENDED/FORMATTED statement. From the table/partition output, we can find the statistics information inside the parameters, such as parameters:{numFiles=1, COLUMN_STATS_ACCURATE=true, transient_lastDdlTime=1417726247, numRows=4, totalSize=227, rawDataSize=223}). The following is an example: jdbc:hive2://> DESCRIBE EXTENDED employee_partitioned. . . . . . .> PARTITION(year=2014, month=12);jdbc:hive2://> DESCRIBE EXTENDED employee;…parameters:{numFiles=1, COLUMN_STATS_ACCURATE=true, transient_lastDdlTime=1417726247, numRows=4, totalSize=227, rawDataSize=223}).jdbc:hive2://> DESCRIBE FORMATTED employee.name;+--------+---------+---+---+---------+--------------+-----------+-----------+|col_name|data_type|min|max|num_nulls|distinct_count|avg_col_len|max_col_len|+--------+---------+---+---+---------+--------------+-----------+-----------+| name | string | | | 0 | 5 | 5.6 | 7 |+--------+---------+---+---+---------+--------------+-----------+-----------++---------+----------+-----------------+|num_trues|num_falses| comment |+---------+----------+-----------------+| | |from deserializer|+---------+----------+-----------------+3 rows selected (0.116 seconds) Hive statistics are persisted in the metastore to avoid computing them every time. For newly created tables and/or partitions, statistics are automatically computed by default if we enable the following setting: jdbc:hive2://> SET hive.stats.autogather=ture; Hive logs Logs provide useful information to find out how a Hive query/job runs. By checking the Hive logs, we can identify runtime problems and issues that may cause bad performance. There are two types of logs available in Hive: system log and job log. The system log contains the Hive running status and issues. It is configured in {HIVE_HOME}/conf/hive-log4j.properties. The following three lines for Hive log can be found: hive.root.logger=WARN,DRFAhive.log.dir=/tmp/${user.name}hive.log.file=hive.log To modify the status, we can either modify the preceding lines in hive-log4j.properties (applies to all users) or set from the Hive CLI (only applies to the current user and current session) as follows: hive --hiveconf hive.root.logger=DEBUG,console The job log contains Hive query information and is saved at the same place, /tmp/${user.name}, by default as one file for each Hive user session. We can override it in hive-site.xml with the hive.querylog.location property. If a Hive query generates MapReduce jobs, those logs can also be viewed through the Hadoop JobTracker Web UI. Job and query optimization Job and query optimization covers experience and skills to improve performance in the area of job-running mode, JVM reuse, job parallel running, and query optimizations in JOIN. Local mode Hadoop can run in standalone, pseudo-distributed, and fully distributed mode. Most of the time, we need to configure Hadoop to run in fully distributed mode. When the data to process is small, it is an overhead to start distributed data processing since the launching time of the fully distributed mode takes more time than the job processing time. Since Hive 0.7.0, Hive supports automatic conversion of a job to run in local mode with the following settings: jdbc:hive2://> SET hive.exec.mode.local.auto=true; --default falsejdbc:hive2://> SET hive.exec.mode.local.auto.inputbytes.max=50000000;jdbc:hive2://> SET hive.exec.mode.local.auto.input.files.max=5;--default 4 A job must satisfy the following conditions to run in the local mode: The total input size of the job is lower than hive.exec.mode.local.auto.inputbytes.max The total number of map tasks is less than hive.exec.mode.local.auto.input.files.max The total number of reduce tasks required is 1 or 0 JVM reuse By default, Hadoop launches a new JVM for each map or reduce job and runs the map or reduce task in parallel. When the map or reduce job is a lightweight job running only for a few seconds, the JVM startup process could be a significant overhead. The MapReduce framework (version 1 only, not Yarn) has an option to reuse JVM by sharing the JVM to run mapper/reducer serially instead of parallel. JVM reuse applies to map or reduce tasks in the same job. Tasks from different jobs will always run in a separate JVM. To enable the reuse, we can set the maximum number of tasks for a single job for JVM reuse using the mapred.job.reuse.jvm.num.tasks property. Its default value is 1: jdbc:hive2://> SET mapred.job.reuse.jvm.num.tasks=5; We can also set the value to –1 to indicate that all the tasks for a job will run in the same JVM. Parallel execution Hive queries commonly are translated into a number of stages that are executed by the default sequence. These stages are not always dependent on each other. Instead, they can run in parallel to save the overall job running time. We can enable this feature with the following settings: jdbc:hive2://> SET hive.exec.parallel=true; -- default falsejdbc:hive2://> SET hive.exec.parallel.thread.number=16;-- default 8, it defines the max number for running in parallel Parallel execution will increase the cluster utilization. If the utilization of a cluster is already very high, parallel execution will not help much in terms of overall performance. Join optimization Here, we'll briefly review the key settings for join improvement. Common join The common join is also called reduce side join. It is a basic join in Hive and works for most of the time. For common joins, we need to make sure the big table is on the right-most side or specified by hit, as follows: /*+ STREAMTABLE(stream_table_name) */. Map join Map join is used when one of the join tables is small enough to fit in the memory, so it is very fast but limited. Since Hive 0.7.0, Hive can convert map join automatically with the following settings: jdbc:hive2://> SET hive.auto.convert.join=true; --default falsejdbc:hive2://> SET hive.mapjoin.smalltable.filesize=600000000;--default 25Mjdbc:hive2://> SET hive.auto.convert.join.noconditionaltask=true;--default false. Set to true so that map join hint is not needed jdbc:hive2://> SET hive.auto.convert.join.noconditionaltask.size=10000000;--The default value controls the size of table to fit in memory Once autoconvert is enabled, Hive will automatically check if the smaller table file size is bigger than the value specified by hive.mapjoin.smalltable.filesize, and then Hive will convert the join to a common join. If the file size is smaller than this threshold, it will try to convert the common join into a map join. Once autoconvert join is enabled, there is no need to provide the map join hints in the query. Bucket map join Bucket map join is a special type of map join applied on the bucket tables. To enable bucket map join, we need to enable the following settings: jdbc:hive2://> SET hive.auto.convert.join=true; --default falsejdbc:hive2://> SET hive.optimize.bucketmapjoin=true; --default false In bucket map join, all the join tables must be bucket tables and join on buckets columns. In addition, the buckets number in bigger tables must be a multiple of the bucket number in the small tables. Sort merge bucket (SMB) join SMB is the join performed on the bucket tables that have the same sorted, bucket, and join condition columns. It reads data from both bucket tables and performs common joins (map and reduce triggered) on the bucket tables. We need to enable the following properties to use SMB: jdbc:hive2://> SET hive.input.format=. . . . . . .> org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;jdbc:hive2://> SET hive.auto.convert.sortmerge.join=true;jdbc:hive2://> SET hive.optimize.bucketmapjoin=true;jdbc:hive2://> SET hive.optimize.bucketmapjoin.sortedmerge=true;jdbc:hive2://> SET hive.auto.convert.sortmerge.join.noconditionaltask=true; Sort merge bucket map (SMBM) join SMBM join is a special bucket join but triggers map-side join only. It can avoid caching all rows in the memory like map join does. To perform SMBM joins, the join tables must have the same bucket, sort, and join condition columns. To enable such joins, we need to enable the following settings: jdbc:hive2://> SET hive.auto.convert.join=true;jdbc:hive2://> SET hive.auto.convert.sortmerge.join=truejdbc:hive2://> SET hive.optimize.bucketmapjoin=true;jdbc:hive2://> SET hive.optimize.bucketmapjoin.sortedmerge=true;jdbc:hive2://> SET hive.auto.convert.sortmerge.join.noconditionaltask=true;jdbc:hive2://> SET hive.auto.convert.sortmerge.join.bigtable.selection.policy=org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ; Skew join When working with data that has a highly uneven distribution, the data skew could happen in such a way that a small number of compute nodes must handle the bulk of the computation. The following setting informs Hive to optimize properly if data skew happens: jdbc:hive2://> SET hive.optimize.skewjoin=true;--If there is data skew in join, set it to true. Default is false. jdbc:hive2://> SET hive.skewjoin.key=100000;--This is the default value. If the number of key is bigger than--this, the new keys will send to the other unused reducers. Skew data could happen on the GROUP BY data too. To optimize it, we need to do the following settings to enable skew data optimization in the GROUP BY result: SET hive.groupby.skewindata=true; Once configured, Hive will first trigger an additional MapReduce job whose map output will randomly distribute to the reducer to avoid data skew. For more information about Hive join optimization, please refer to the Apache Hive wiki available at https://cwiki.apache.org/confluence/display/Hive/LanguageManual+JoinOptimization and https://cwiki.apache.org/confluence/display/Hive/Skewed+Join+Optimization. Summary In this article, we first covered how to identify performance bottlenecks using the EXPLAIN and ANALYZE statements. Then, we discussed job and query optimization in Hive. Resources for Article: Further resources on this subject: Apache Maven and m2eclipse [Article] Apache Karaf – Provisioning and Clusters [Article] Introduction to Apache ZooKeeper [Article]
Read more
  • 0
  • 0
  • 1126
article-image-mapreduce-functions
Packt
03 Mar 2015
11 min read
Save for later

MapReduce functions

Packt
03 Mar 2015
11 min read
 In this article, by John Zablocki, author of the book, Couchbase Essentials, you will be acquainted to MapReduce and how you'll use it to create secondary indexes for our documents. At its simplest, MapReduce is a programming pattern used to process large amounts of data that is typically distributed across several nodes in parallel. In the NoSQL world, MapReduce implementations may be found on many platforms from MongoDB to Hadoop, and of course, Couchbase. Even if you're new to the NoSQL landscape, it's quite possible that you've already worked with a form of MapReduce. The inspiration for MapReduce in distributed NoSQL systems was drawn from the functional programming concepts of map and reduce. While purely functional programming languages haven't quite reached mainstream status, languages such as Python, C#, and JavaScript all support map and reduce operations. (For more resources related to this topic, see here.) Map functions Consider the following Python snippet: numbers = [1, 2, 3, 4, 5] doubled = map(lambda n: n * 2, numbers) #doubled == [2, 4, 6, 8, 10] These two lines of code demonstrate a very simple use of a map() function. In the first line, the numbers variable is created as a list of integers. The second line applies a function to the list to create a new mapped list. In this case, the map() function is supplied as a Python lambda, which is just an inline, unnamed function. The body of lambda multiplies each number by two. This map() function can be made slightly more complex by doubling only odd numbers, as shown in this code: numbers = [1, 2, 3, 4, 5] defdouble_odd(num):   if num % 2 == 0:     return num   else:     return num * 2   doubled = map(double_odd, numbers) #doubled == [2, 2, 6, 4, 10] Map functions are implemented differently in each language or platform that supports them, but all follow the same pattern. An iterable collection of objects is passed to a map function. Each item of the collection is then iterated over with the map function being applied to that iteration. The final result is a new collection where each of the original items is transformed by the map. Reduce functions Like maps, the reduce functions also work by applying a provided function to an iterable data structure. The key difference between the two is that the reduce function works to produce a single value from the input iterable. Using Python's built-in reduce() function, we can see how to produce a sum of integers, as follows: numbers = [1, 2, 3, 4, 5] sum = reduce(lambda x, y: x + y, numbers) #sum == 15 You probably noticed that unlike our map operation, the reduce lambda has two parameters (x and y in this case). The argument passed to x will be the accumulated value of all applications of the function so far, and y will receive the next value to be added to the accumulation. Parenthetically, the order of operations can be seen as ((((1 + 2) + 3) + 4) + 5). Alternatively, the steps are shown in the following list: x = 1, y = 2 x = 3, y = 3 x = 6, y = 4 x = 10, y = 5 x = 15 As this list demonstrates, the value of x is the cumulative sum of previous x and y values. As such, reduce functions are sometimes termed accumulate or fold functions. Regardless of their name, reduce functions serve the common purpose of combining pieces of a recursive data structure to produce a single value. Couchbase MapReduce Creating an index (or view) in Couchbase requires creating a map function written in JavaScript. When the view is created for the first time, the map function is applied to each document in the bucket containing the view. When you update a view, only new or modified documents are indexed. This behavior is known as incremental MapReduce. You can think of a basic map function in Couchbase as being similar to a SQL CREATE INDEX statement. Effectively, you are defining a column or a set of columns, to be indexed by the server. Of course, these are not columns, but rather properties of the documents to be indexed. Basic mapping To illustrate the process of creating a view, first imagine that we have a set of JSON documents as shown here: var books=[     { "id": 1, "title": "The Bourne Identity", "author": "Robert Ludlow"     },     { "id": 2, "title": "The Godfather", "author": "Mario Puzzo"     },     { "id": 3, "title": "Wiseguy", "author": "Nicholas Pileggi"     } ]; Each document contains title and author properties. In Couchbase, to query these documents by either title or author, we'd first need to write a map function. Without considering how map functions are written in Couchbase, we're able to understand the process with vanilla JavaScript: books.map(function(book) {   return book.author; }); In the preceding snippet, we're making use of the built-in JavaScript array's map() function. Similar to the Python snippets we saw earlier, JavaScript's map() function takes a function as a parameter and returns a new array with mapped objects. In this case, we'll have an array with each book's author, as follows: ["Robert Ludlow", "Mario Puzzo", "Nicholas Pileggi"] At this point, we have a mapped collection that will be the basis for our author index. However, we haven't provided a means for the index to be able to refer back to its original document. If we were using a relational database, we'd have effectively created an index on the Title column with no way to get back to the row that contained it. With a slight modification to our map function, we are able to provide the key (the id property) of the document as well in our index: books.map(function(book) {   return [book.author, book.id]; }); In this slightly modified version, we're including the ID with the output of each author. In this way, the index has its document's key stored with its title. [["The Bourne Identity", 1], ["The Godfather", 2], ["Wiseguy", 3]] We'll soon see how this structure more closely resembles the values stored in a Couchbase index. Basic reducing Not every Couchbase index requires a reduce component. In fact, we'll see that Couchbase already comes with built-in reduce functions that will provide you with most of the reduce behavior you need. However, before relying on only those functions, it's important to understand why you'd use a reduce function in the first place. Returning to the preceding example of the map, let's imagine we have a few more documents in our set, as follows: var books=[     { "id": 1, "title": "The Bourne Identity", "author": "Robert Ludlow"     },     { "id": 2, "title": "The Bourne Ultimatum", "author": "Robert Ludlow"     },     { "id": 3, "title": "The Godfather", "author": "Mario Puzzo"     },     { "id": 4, "title": "The Bourne Supremacy", "author": "Robert Ludlow"     },     { "id": 5, "title": "The Family", "author": "Mario Puzzo"     },  { "id": 6, "title": "Wiseguy", "author": "Nicholas Pileggi"     } ]; We'll still create our index using the same map function because it provides a way of accessing a book by its author. Now imagine that we want to know how many books an author has written, or (assuming we had more data) the average number of pages written by an author. These questions are not possible to answer with a map function alone. Each application of the map function knows nothing about the previous application. In other words, there is no way for you to compare or accumulate information about one author's book to another book by the same author. Fortunately, there is a solution to this problem. As you've probably guessed, it's the use of a reduce function. As a somewhat contrived example, consider this JavaScript: mapped = books.map(function (book) {     return ([book.id, book.author]); });   counts = {} reduced = mapped.reduce(function(prev, cur, idx, arr) { var key = cur[1];     if (! counts[key]) counts[key] = 0;     ++counts[key] }, null); This code doesn't quite accurately reflect the way you would count books with Couchbase but it illustrates the basic idea. You look for each occurrence of a key (author) and increment a counter when it is found. With Couchbase MapReduce, the mapped structure is supplied to the reduce() function in a better format. You won't need to keep track of items in a dictionary. Couchbase views At this point, you should have a general sense of what MapReduce is, where it came from, and how it will affect the creation of a Couchbase Server view. So without further ado, let's see how to write our first Couchbase view. In fact, there were two to choose from. The bucket we'll use is beer-sample. If you didn't install it, don't worry. You can add it by opening the Couchbase Console and navigating to the Settings tab. Here, you'll find the option to install the bucket, as shown next: First, you need to understand the document structures with which you're working. The following JSON object is a beer document (abbreviated for brevity): {  "name": "Sundog",  "type": "beer",  "brewery_id": "new_holland_brewing_company",  "description": "Sundog is an amber ale...",  "style": "American-Style Amber/Red Ale",  "category": "North American Ale" } As you can see, the beer documents have several properties. We're going to create an index to let us query these documents by name. In SQL, the query would look like this: SELECT Id FROM Beers WHERE Name = ? You might be wondering why the SQL example includes only the Id column in its projection. For now, just know that to query a document using a view with Couchbase, the property by which you're querying must be included in an index. To create that index, we'll write a map function. The simplest example of a map function to query beer documents by name is as follows: function(doc) {   emit(doc.name); } This body of the map function has only one line. It calls the built-in Couchbase emit() function. This function is used to signal that a value should be indexed. The output of this map function will be an array of names. The beer-sample bucket includes brewery data as well. These documents look like the following code (abbreviated for brevity): {   "name": "Thomas Hooker Brewing",   "city": "Bloomfield",   "state": "Connecticut",   "website": "http://www.hookerbeer.com/",   "type": "brewery" } If we reexamine our map function, we'll see an obvious problem; both the brewery and beer documents have a name property. When this map function is applied to the documents in the bucket, it will create an index with documents from either the brewery or beer documents. The problem is that Couchbase documents exist in a single container—the bucket. There is no namespace for a set of related documents. The solution has typically involved including a type or docType property on each document. The value of this property is used to distinguish one document from another. In the case of the beer-sample database, beer documents have type = "beer" and brewery documents have type = "brewery". Therefore, we are easily able to modify our map function to create an index only on beer documents: function(doc) {   if (doc.type == "beer") {     emit(doc.name);   } } The emit() function actually takes two arguments. The first, as we've seen, emits a value to be indexed. The second argument is an optional value and is used by the reduce function. Imagine that we want to count the number of beer types in a particular category. In SQL, we would write the following query: SELECT Category, COUNT(*) FROM Beers GROUP BY Category To achieve the same functionality with Couchbase Server, we'll need to use both map and reduce functions. First, let's write the map. It will create an index on the category property: function(doc) {   if (doc.type == "beer") {     emit(doc.category, 1);   } } The only real difference between our category index and our name index is that we're including an argument for the value parameter of the emit() function. What we'll do with that value is simply count them. This counting will be done in our reduce function: function(keys, values) {   return values.length; } In this example, the values parameter will be given to the reduce function as a list of all values associated with a particular key. In our case, for each beer category, there will be a list of ones (that is, [1, 1, 1, 1, 1, 1]). Couchbase also provides a built-in _count function. It can be used in place of the entire reduce function in the preceding example. Now that we've seen the basic requirements when creating an actual Couchbase view, it's time to add a view to our bucket. The easiest way to do so is to use the Couchbase Console. Summary In this article, you learned the purpose of secondary indexes in a key/value store. We dug deep into MapReduce, both in terms of its history in functional languages and as a tool for NoSQL and big data systems. Resources for Article: Further resources on this subject: Map Reduce? [article] Introduction to Mapreduce [article] Working with Apps Splunk [article]
Read more
  • 0
  • 0
  • 1356

article-image-getting-started-postgresql
Packt
03 Mar 2015
11 min read
Save for later

Getting Started with PostgreSQL

Packt
03 Mar 2015
11 min read
In this article by Ibrar Ahmed, Asif Fayyaz, and Amjad Shahzad, authors of the book PostgreSQL Developer's Guide, we will come across the basic features and functions of PostgreSQL, such as writing queries using psql, data definition in tables, and data manipulation from tables. (For more resources related to this topic, see here.) PostgreSQL is widely considered to be one of the most stable database servers available today, with multiple features that include: A wide range of built-in types MVCC New SQL enhancements, including foreign keys, primary keys, and constraints Open source code, maintained by a team of developers Trigger and procedure support with multiple procedural languages Extensibility in the sense of adding new data types and the client language From the early releases of PostgreSQL (from version 6.0 that is), many changes have been made, with each new major version adding new and more advanced features. The current version is PostgreSQL 9.4 and is available from several sources and in various binary formats. Writing queries using psql Before proceeding, allow me to explain to you that throughout this article, we will use a warehouse database called warehouse_db. In this section, I will show you how you can create such a database, providing you with sample code for assistance. You will need to do the following: We are assuming here that you have successfully installed PostgreSQL and faced no issues. Now, you will need to connect with the default database that is created by the PostgreSQL installer. To do this, navigate to the default path of installation, which is /opt/PostgreSQL/9.4/bin from your command line, and execute the following command that will prompt for a postgres user password that you provided during the installation: /opt/PostgreSQL/9.4/bin$./psql -U postgres Password for user postgres: Using the following command, you can log in to the default database with the user postgres and you will be able to see the following on your command line: psql (9.4beta1) Type "help" for help postgres=# You can then create a new database called warehouse_db using the following statement in the terminal: postgres=# CREATE DATABASE warehouse_db; You can then connect with the warehouse_db database using the following command: postgres=# c warehouse_db You are now connected to the warehouse_db database as the user postgres, and you will have the following warehouse_db shell: warehouse_db=# Let's summarize what we have achieved so far. We are now able to connect with the default database postgres and created a warehouse_db database successfully. It's now time to actually write queries using psql and perform some Data Definition Language (DDL) and Data Manipulation Language (DML) operations, which we will cover in the following sections. In PostgreSQL, we can have multiple databases. Inside the databases, we can have multiple extensions and schemas. Inside each schema, we can have database objects such as tables, views, sequences, procedures, and functions. We are first going to create a schema named record and then we will create some tables in this schema. To create a schema named record in the warehouse_db database, use the following statement: warehouse_db=# CREATE SCHEMA record; Creating, altering, and truncating a table In this section, we will learn about creating a table, altering the table definition, and truncating the table. Creating tables Now, let's perform some DDL operations starting with creating tables. To create a table named warehouse_tbl, execute the following statements: warehouse_db=# CREATE TABLE warehouse_tbl ( warehouse_id INTEGER NOT NULL, warehouse_name TEXT NOT NULL, year_created INTEGER, street_address TEXT, city CHARACTER VARYING(100), state CHARACTER VARYING(2), zip CHARACTER VARYING(10), CONSTRAINT "PRIM_KEY" PRIMARY KEY (warehouse_id) ); The preceding statements created the table warehouse_tbl that has the primary key warehouse_id. Now, as you are familiar with the table creation syntax, let's create a sequence and use that in a table. You can create the hist_id_seq sequence using the following statement: warehouse_db=# CREATE SEQUENCE hist_id_seq; The preceding CREATE SEQUENCE command creates a new sequence number generator. This involves creating and initializing a new special single-row table with the name hist_id_seq. The user issuing the command will own the generator. You can now create the table that implements the hist_id_seq sequence using the following statement: warehouse_db=# CREATE TABLE history ( history_id INTEGER NOT NULL DEFAULT nextval('hist_id_seq'), date TIMESTAMP WITHOUT TIME ZONE, amount INTEGER, data TEXT, customer_id INTEGER, warehouse_id INTEGER, CONSTRAINT "PRM_KEY" PRIMARY KEY (history_id), CONSTRAINT "FORN_KEY" FOREIGN KEY (warehouse_id) REFERENCES warehouse_tbl(warehouse_id) ); The preceding query will create a history table in the warehouse_db database, and the history_id column uses the sequence as the default input value. In this section, we successfully learned how to create a table and also learned how to use a sequence inside the table creation syntax. Altering tables Now that we have learned how to create multiple tables, we can practice some ALTER TABLE commands by following this section. With the ALTER TABLE command, we can add, remove, or rename table columns. Firstly, with the help of the following example, we will be able to add the phone_no column in the previously created table warehouse_tbl: warehouse_db=# ALTER TABLE warehouse_tbl ADD COLUMN phone_no INTEGER; We can then verify that a column is added in the table by describing the table as follows: warehouse_db=# d warehouse_tbl            Table "public.warehouse_tbl"                  Column     |         Type         | Modifiers ----------------+------------------------+----------- warehouse_id  | integer               | not null warehouse_name | text                   | not null year_created   | integer               | street_address | text                   | city           | character varying(100) | state           | character varying(2)   | zip             | character varying(10) | phone_no       | integer               | Indexes: "PRIM_KEY" PRIMARY KEY, btree (warehouse_id) Referenced by: TABLE "history" CONSTRAINT "FORN_KEY"FOREIGN KEY  (warehouse_id) REFERENCES warehouse_tbl(warehouse_id) TABLE  "history" CONSTRAINT "FORN_KEY" FOREIGN KEY (warehouse_id)  REFERENCES warehouse_tbl(warehouse_id) To drop a column from a table, we can use the following statement: warehouse_db=# ALTER TABLE warehouse_tbl DROP COLUMN phone_no; We can then finally verify that the column has been removed from the table by describing the table again as follows: warehouse_db=# d warehouse_tbl            Table "public.warehouse_tbl"                  Column     |         Type         | Modifiers ----------------+------------------------+----------- warehouse_id   | integer               | not null warehouse_name | text                   | not null year_created   | integer               | street_address | text                   | city           | character varying(100) | state           | character varying(2)   | zip             | character varying(10) | Indexes: "PRIM_KEY" PRIMARY KEY, btree (warehouse_id) Referenced by: TABLE "history" CONSTRAINT "FORN_KEY" FOREIGN KEY  (warehouse_id) REFERENCES warehouse_tbl(warehouse_id) TABLE  "history" CONSTRAINT "FORN_KEY" FOREIGN KEY (warehouse_id)  REFERENCES warehouse_tbl(warehouse_id) Truncating tables The TRUNCATE command is used to remove all rows from a table without providing any criteria. In the case of the DELETE command, the user has to provide the delete criteria using the WHERE clause. To truncate data from the table, we can use the following statement: warehouse_db=# TRUNCATE TABLE warehouse_tbl; We can then verify that the warehouse_tbl table has been truncated by performing a SELECT COUNT(*) query on it using the following statement: warehouse_db=# SELECT COUNT(*) FROM warehouse_tbl; count -------      0 (1 row) Inserting, updating, and deleting data from tables In this section, we will play around with data and learn how to insert, update, and delete data from a table. Inserting data So far, we have learned how to create and alter a table. Now it's time to play around with some data. Let's start by inserting records in the warehouse_tbl table using the following command snippet: warehouse_db=# INSERT INTO warehouse_tbl ( warehouse_id, warehouse_name, year_created, street_address, city, state, zip ) VALUES ( 1, 'Mark Corp', 2009, '207-F Main Service Road East', 'New London', 'CT', 4321 ); We can then verify that the record has been inserted by performing a SELECT query on the warehouse_tbl table as follows: warehouse_db=# SELECT warehouse_id, warehouse_name, street_address               FROM warehouse_tbl; warehouse_id | warehouse_name |       street_address         ---------------+----------------+------------------------------- >             1 | Mark Corp     | 207-F Main Service Road East (1 row) Updating data Once we have inserted data in our table, we should know how to update it. This can be done using the following statement: warehouse_db=# UPDATE warehouse_tbl SET year_created=2010 WHERE year_created=2009; To verify that a record is updated, let's perform a SELECT query on the warehouse_tbl table as follows: warehouse_db=# SELECT warehouse_id, year_created FROM               warehouse_tbl; warehouse_id | year_created --------------+--------------            1 |         2010 (1 row) Deleting data To delete data from a table, we can use the DELETE command. Let's add a few records to the table and then later on delete data on the basis of certain conditions: warehouse_db=# INSERT INTO warehouse_tbl ( warehouse_id, warehouse_name, year_created, street_address, city, state, zip ) VALUES ( 2, 'Bill & Co', 2014, 'Lilly Road', 'New London', 'CT', 4321 ); warehouse_db=# INSERT INTO warehouse_tbl ( warehouse_id, warehouse_name, year_created, street_address, city, state, zip ) VALUES ( 3, 'West point', 2013, 'Down Town', 'New London', 'CT', 4321 ); We can then delete data from the warehouse.tbl table, where warehouse_name is Bill & Co, by executing the following statement: warehouse_db=# DELETE FROM warehouse_tbl WHERE warehouse_name='Bill & Co'; To verify that a record has been deleted, we will execute the following SELECT query: warehouse_db=# SELECT warehouse_id, warehouse_name FROM warehouse_tbl WHERE warehouse_name='Bill & Co'; warehouse_id | warehouse_name --------------+---------------- (0 rows) The DELETE command is used to drop a row from a table, whereas the DROP command is used to drop a complete table. The TRUNCATE command is used to empty the whole table. Summary In this article, we learned how to utilize the SQL language for a collection of everyday DBMS exercises in an easy-to-use practical way. We also figured out how to make a complete database that incorporates DDL (create, alter, and truncate) and DML (insert, update, and delete) operators. Resources for Article: Further resources on this subject: Indexes [Article] Improving proximity filtering with KNN [Article] Using Unrestricted Languages [Article]
Read more
  • 0
  • 0
  • 1197

article-image-postgresql-extensible-rdbms
Packt
03 Mar 2015
18 min read
Save for later

PostgreSQL as an Extensible RDBMS

Packt
03 Mar 2015
18 min read
This article by Usama Dar, the author of the book PostgreSQL Server Programming - Second Edition, explains the process of creating a new operator, overloading it, optimizing it, creating index access methods, and much more. PostgreSQL is an extensible database. I hope you've learned this much by now. It is extensible by virtue of the design that it has. As discussed before, PostgreSQL uses a catalog-driven design. In fact, PostgreSQL is more catalog-driven than most of the traditional relational databases. The key benefit here is that the catalogs can be changed or added to, in order to modify or extend the database functionality. PostgreSQL also supports dynamic loading, that is, a user-written code can be provided as a shared library, and PostgreSQL will load it as required. (For more resources related to this topic, see here.) Extensibility is critical for many businesses, which have needs that are specific to that business or industry. Sometimes, the tools provided by the traditional database systems do not fulfill those needs. People in those businesses know best how to solve their particular problems, but they are not experts in database internals. It is often not possible for them to cook up their own database kernel or modify the core or customize it according to their needs. A truly extensible database will then allow you to do the following: Solve domain-specific problems in a seamless way, like a native solution Build complete features without modifying the core database engine Extend the database without interrupting availability PostgreSQL not only allows you to do all of the preceding things, but also does these, and more with utmost ease. In terms of extensibility, you can do the following things in a PostgreSQL database: Create your own data types Create your own functions Create your own aggregates Create your own operators Create your own index access methods (operator classes) Create your own server programming language Create foreign data wrappers (SQL/MED) and foreign tables What can't be extended? Although PostgreSQL is an extensible platform, there are certain things that you can't do or change without explicitly doing a fork, as follows: You can't change or plug in a new storage engine. If you are coming from the MySQL world, this might annoy you a little. However, PostgreSQL's storage engine is tightly coupled with its executor and the rest of the system, which has its own benefits. You can't plug in your own planner/parser. One can argue for and against the ability to do that, but at the moment, the planner, parser, optimizer, and so on are baked into the system and there is no possibility of replacing them. There has been some talk on this topic, and if you are of the curious kind, you can read some of the discussion at http://bit.ly/1yRMkK7. We will now briefly discuss some more of the extensibility capabilities of PostgreSQL. We will not dive deep into the topics, but we will point you to the appropriate link where more information can be found. Creating a new operator Now, let's take look at how we can add a new operator in PostgreSQL. Adding new operators is not too different from adding new functions. In fact, an operator is syntactically just a different way to use an existing function. For example, the + operator calls a built-in function called numeric_add and passes it the two arguments. When you define a new operator, you must define the data types that the operator expects as arguments and define which function is to be called. Let's take a look at how to define a simple operator. You have to use the CREATE OPERATOR command to create an operator. Let's use that function to create a new Fibonacci operator, ##, which will have an integer on its left-hand side: CREATE OPERATOR ## (PROCEDURE=fib, LEFTARG=integer); Now, you can use this operator in your SQL to calculate a Fibonacci number: testdb=# SELECT 12##;?column?----------144(1 row) Note that we defined that the operator will have an integer on the left-hand side. If you try to put a value on the right-hand side of the operator, you will get an error: postgres=# SELECT ##12;ERROR: operator does not exist: ## integer at character 8HINT: No operator matches the given name and argument type(s). Youmight need to add explicit type casts.STATEMENT: select ##12;ERROR: operator does not exist: ## integerLINE 1: select ##12;^HINT: No operator matches the given name and argument type(s). Youmight need to add explicit type casts. Overloading an operator Operators can be overloaded in the same way as functions. This means, that an operator can have the same name as an existing operator but with a different set of argument types. More than one operator can have the same name, but two operators can't share the same name if they accept the same types and positions of the arguments. As long as there is a function that accepts the same kind and number of arguments that an operator defines, it can be overloaded. Let's override the ## operator we defined in the last example, and also add the ability to provide an integer on the right-hand side of the operator: CREATE OPERATOR ## (PROCEDURE=fib, RIGHTARG=integer); Now, running the same SQL, which resulted in an error last time, should succeed, as shown here: testdb=# SELECT ##12;?column?----------144(1 row) You can drop the operator using the DROP OPERATOR command. You can read more about creating and overloading new operators in the PostgreSQL documentation at http://www.postgresql.org/docs/current/static/sql-createoperator.html and http://www.postgresql.org/docs/current/static/xoper.html. There are several optional clauses in the operator definition that can optimize the execution time of the operators by providing information about operator behavior. For example, you can specify the commutator and the negator of an operator that help the planner use the operators in index scans. You can read more about these optional clauses at http://www.postgresql.org/docs/current/static/xoper-optimization.html. Since this article is just an introduction to the additional extensibility capabilities of PostgreSQL, we will just introduce a couple of optimization options; any serious production quality operator definitions should include these optimization clauses, if applicable. Optimizing operators The optional clauses tell the PostgreSQL server about how the operators behave. These options can result in considerable speedups in the execution of queries that use the operator. However, if you provide these options incorrectly, it can result in a slowdown of the queries. Let's take a look at two optimization clauses called commutator and negator. COMMUTATOR This clause defines the commuter of the operator. An operator A is a commutator of operator B if it fulfils the following condition: x A y = y B x. It is important to provide this information for the operators that will be used in indexes and joins. As an example, the commutator for > is <, and the commutator of = is = itself. This helps the optimizer to flip the operator in order to use an index. For example, consider the following query: SELECT * FROM employee WHERE new_salary > salary; If the index is defined on the salary column, then PostgreSQL can rewrite the preceding query as shown: SELECT * from employee WHERE salary < new_salary This allows PostgreSQL to use a range scan on the index column salary. For a user-defined operator, the optimizer can only do this flip around if the commutator of a user-defined operator is defined: CREATE OPERATOR > (LEFTARG=integer, RIGHTARG=integer, PROCEDURE=comp, COMMUTATOR = <) NEGATOR The negator clause defines the negator of the operator. For example, <> is a negator of =. Consider the following query: SELECT * FROM employee WHERE NOT (dept = 10); Since <> is defined as a negator of =, the optimizer can simplify the preceding query as follows: SELECT * FROM employee WHERE dept <> 10; You can even verify that using the EXPLAIN command: postgres=# EXPLAIN SELECT * FROM employee WHERE NOTdept = 'WATER MGMNT';QUERY PLAN---------------------------------------------------------Foreign Scan on employee (cost=0.00..1.10 rows=1 width=160)Filter: ((dept)::text <> 'WATER MGMNT'::text)Foreign File: /Users/usamadar/testdata.csvForeign File Size: 197(4 rows) Creating index access methods Let's discuss how to index new data types or user-defined types and operators. In PostgreSQL, an index is more of a framework that can be extended or customized for using different strategies. In order to create new index access methods, we have to create an operator class. Let's take a look at a simple example. Let's consider a scenario where you have to store some special data such as an ID or a social security number in the database. The number may contain non-numeric characters, so it is defined as a text type: CREATE TABLE test_ssn (ssn text);INSERT INTO test_ssn VALUES ('222-11-020878');INSERT INTO test_ssn VALUES ('111-11-020978'); Let's assume that the correct order for this data is such that it should be sorted on the last six digits and not the ASCII value of the string. The fact that these numbers need a unique sort order presents a challenge when it comes to indexing the data. This is where PostgreSQL operator classes are useful. An operator allows a user to create a custom indexing strategy. Creating an indexing strategy is about creating your own operators and using them alongside a normal B-tree. Let's start by writing a function that changes the order of digits in the value and also gets rid of the non-numeric characters in the string to be able to compare them better: CREATE OR REPLACE FUNCTION fix_ssn(text)RETURNS text AS $$BEGINRETURN substring($1,8) || replace(substring($1,1,7),'-','');END;$$LANGUAGE 'plpgsql' IMMUTABLE; Let's run the function and verify that it works: testdb=# SELECT fix_ssn(ssn) FROM test_ssn;fix_ssn-------------0208782221102097811111(2 rows) Before an index can be used with a new strategy, we may have to define some more functions depending on the type of index. In our case, we are planning to use a simple B-tree, so we need a comparison function: CREATE OR REPLACE FUNCTION ssn_compareTo(text, text)RETURNS int AS$$BEGINIF fix_ssn($1) < fix_ssn($2)THENRETURN -1;ELSIF fix_ssn($1) > fix_ssn($2)THENRETURN +1;ELSERETURN 0;END IF;END;$$ LANGUAGE 'plpgsql' IMMUTABLE; It's now time to create our operator class: CREATE OPERATOR CLASS ssn_opsFOR TYPE text USING btreeASOPERATOR 1 < ,OPERATOR 2 <= ,OPERATOR 3 = ,OPERATOR 4 >= ,OPERATOR 5 > ,FUNCTION 1 ssn_compareTo(text, text); You can also overload the comparison operators if you need to compare the values in a special way, and use the functions in the compareTo function as well as provide them in the CREATE OPERATOR CLASS command. We will now create our first index using our brand new operator class: CREATE INDEX idx_ssn ON test_ssn (ssn ssn_ops); We can check whether the optimizer is willing to use our special index, as follows: testdb=# SET enable_seqscan=off;testdb=# EXPLAIN SELECT * FROM test_ssn WHERE ssn = '02087822211';QUERY PLAN------------------------------------------------------------------Index Only Scan using idx_ssn on test_ssn (cost=0.13..8.14 rows=1width=32)Index Cond: (ssn = '02087822211'::text)(2 rows) Therefore, we can confirm that the optimizer is able to use our new index. You can read about index access methods in the PostgreSQL documentation at http://www.postgresql.org/docs/current/static/xindex.html. Creating user-defined aggregates User-defined aggregate functions are probably a unique PostgreSQL feature, yet they are quite obscure and perhaps not many people know how to create them. However, once you are able to create this function, you will wonder how you have lived for so long without using this feature. This functionality can be incredibly useful, because it allows you to perform custom aggregates inside the database, instead of querying all the data from the client and doing a custom aggregate in your application code, that is, the number of hits on your website per minute from a specific country. PostgreSQL has a very simple process for defining aggregates. Aggregates can be defined using any functions and in any languages that are installed in the database. Here are the basic steps to building an aggregate function in PostgreSQL: Define a start function that will take in the values of a result set; this function can be defined in any PL language you want. Define an end function that will do something with the final output of the start function. This can be in any PL language you want. Define the aggregate using the CREATE AGGREGATE command, providing the start and end functions you just created. Let's steal an example from the PostgreSQL wiki at http://wiki.postgresql.org/wiki/Aggregate_Median. In this example, we will calculate the statistical median of a set of data. For this purpose, we will define start and end aggregate functions. Let's define the end function first, which takes an array as a parameter and calculates the median. We are assuming here that our start function will pass an array to the following end function: CREATE FUNCTION _final_median(anyarray) RETURNS float8 AS $$WITH q AS(SELECT valFROM unnest($1) valWHERE VAL IS NOT NULLORDER BY 1),cnt AS(SELECT COUNT(*) AS c FROM q)SELECT AVG(val)::float8FROM(SELECT val FROM qLIMIT 2 - MOD((SELECT c FROM cnt), 2)OFFSET GREATEST(CEIL((SELECT c FROM cnt) / 2.0) - 1,0)) q2;$$ LANGUAGE sql IMMUTABLE; Now, we create the aggregate as shown in the following code: CREATE AGGREGATE median(anyelement) (SFUNC=array_append,STYPE=anyarray,FINALFUNC=_final_median,INITCOND='{}'); The array_append start function is already defined in PostgreSQL. This function appends an element to the end of an array. In our example, the start function takes all the column values and creates an intermediate array. This array is passed on to the end function, which calculates the median. Now, let's create a table and some test data to run our function: testdb=# CREATE TABLE median_test(t integer);CREATE TABLEtestdb=# INSERT INTO median_test SELECT generate_series(1,10);INSERT 0 10 The generate_series function is a set returning function that generates a series of values, from start to stop with a step size of one. Now, we are all set to test the function: testdb=# SELECT median(t) FROM median_test;median--------5.5(1 row) The mechanics of the preceding example are quite easy to understand. When you run the aggregate, the start function is used to append all the table data from column t into an array using the append_array PostgreSQL built-in. This array is passed on to the final function, _final_median, which calculates the median of the array and returns the result in the same data type as the input parameter. This process is done transparently to the user of the function who simply has a convenient aggregate function available to them. You can read more about the user-defined aggregates in the PostgreSQL documentation in much more detail at http://www.postgresql.org/docs/current/static/xaggr.html. Using foreign data wrappers PostgreSQL foreign data wrappers (FDW) are an implementation of SQL Management of External Data (SQL/MED), which is a standard added to SQL in 2013. FDWs are drivers that allow PostgreSQL database users to read and write data to other external data sources, such as other relational databases, NoSQL data sources, files, JSON, LDAP, and even Twitter. You can query the foreign data sources using SQL and create joins across different systems or even across different data sources. There are several different types of data wrappers developed by different developers and not all of them are production quality. You can see a select list of wrappers on the PostgreSQL wiki at http://wiki.postgresql.org/wiki/Foreign_data_wrappers. Another list of FDWs can be found on PGXN at http://pgxn.org/tag/fdw/. Let's take look at a small example of using file_fdw to access data in a CSV file. First, you need to install the file_fdw extension. If you compiled PostgreSQL from the source, you will need to install the file_fdw contrib module that is distributed with the source. You can do this by going into the contrib/file_fdw folder and running make and make install. If you used an installer or a package for your platform, this module might have been installed automatically. Once the file_fdw module is installed, you will need to create the extension in the database: postgres=# CREATE EXTENSION file_fdw;CREATE EXTENSION Let's now create a sample CSV file that uses the pipe, |, as a separator and contains some employee data: $ cat testdata.csvAARON, ELVIA J|WATER RATE TAKER|WATER MGMNT|81000.00|73862.00AARON, JEFFERY M|POLICE OFFICER|POLICE|74628.00|74628.00AARON, KIMBERLEI R|CHIEF CONTRACT EXPEDITER|FLEETMANAGEMNT|77280.00|70174.00 Now, we should create a foreign server that is pretty much a formality because the file is on the same server. A foreign server normally contains the connection information that a foreign data wrapper uses to access an external data resource. The server needs to be unique within the database: CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw; The next step, is to create a foreign table that encapsulates our CSV file: CREATE FOREIGN TABLE employee (emp_name VARCHAR,job_title VARCHAR,dept VARCHAR,salary NUMERIC,sal_after_tax NUMERIC) SERVER file_serverOPTIONS (format 'csv',header 'false' , filename '/home/pgbook/14/testdata.csv', delimiter '|', null '');''); The CREATE FOREIGN TABLE command creates a foreign table and the specifications of the file are provided in the OPTIONS section of the preceding code. You can provide the format, and if the first line of the file is a header (header 'false'), in our case there is no file header. We then provide the name and path of the file and the delimiter used in the file, which in our case is the pipe symbol |. In this example, we also specify that the null values should be represented as an empty string. Let's run a SQL command on our foreign table: postgres=# select * from employee;-[ RECORD 1 ]-+-------------------------emp_name | AARON, ELVIA Jjob_title | WATER RATE TAKERdept | WATER MGMNTsalary | 81000.00sal_after_tax | 73862.00-[ RECORD 2 ]-+-------------------------emp_name | AARON, JEFFERY Mjob_title | POLICE OFFICERdept | POLICEsalary | 74628.00sal_after_tax | 74628.00-[ RECORD 3 ]-+-------------------------emp_name | AARON, KIMBERLEI Rjob_title | CHIEF CONTRACT EXPEDITERdept | FLEET MANAGEMNTsalary | 77280.00sal_after_tax | 70174.00 Great, looks like our data is successfully loaded from the file. You can also use the d meta command to see the structure of the employee table: postgres=# d employee;Foreign table "public.employee"Column | Type | Modifiers | FDW Options---------------+-------------------+-----------+-------------emp_name | character varying | |job_title | character varying | |dept | character varying | |salary | numeric | |sal_after_tax | numeric | |Server: file_serverFDW Options: (format 'csv', header 'false',filename '/home/pg_book/14/testdata.csv', delimiter '|',"null" '') You can run explain on the query to understand what is going on when you run a query on the foreign table: postgres=# EXPLAIN SELECT * FROM employee WHERE salary > 5000;QUERY PLAN---------------------------------------------------------Foreign Scan on employee (cost=0.00..1.10 rows=1 width=160)Filter: (salary > 5000::numeric)Foreign File: /home/pgbook/14/testdata.csvForeign File Size: 197(4 rows) The ALTER FOREIGN TABLE command can be used to modify the options. More information about the file_fdw is available at http://www.postgresql.org/docs/current/static/file-fdw.html. You can take a look at the CREATE SERVER and CREATE FOREIGN TABLE commands in the PostgreSQL documentation for more information on the many options available. Each of the foreign data wrappers comes with its own documentation about how to use the wrapper. Make sure that an extension is stable enough before it is used in production. The PostgreSQL core development group does not support most of the FDW extensions. If you want to create your own data wrappers, you can find the documentation at http://www.postgresql.org/docs/current/static/fdwhandler.html as an excellent starting point. The best way to learn, however, is to read the code of other available extensions. Summary This includes the ability to add new operators, new index access methods, and create your own aggregates. You can access foreign data sources, such as other databases, files, and web services using PostgreSQL foreign data wrappers. These wrappers are provided as extensions and should be used with caution, as most of them are not officially supported. Even though PostgreSQL is very extensible, you can't plug in a new storage engine or change the parser/planner and executor interfaces. These components are very tightly coupled with each other and are, therefore, highly optimized and mature. Resources for Article: Further resources on this subject: Load balancing MSSQL [Article] Advanced SOQL Statements [Article] Running a PostgreSQL Database Server [Article]
Read more
  • 0
  • 0
  • 3144
article-image-introducing-splunk
Packt
03 Mar 2015
14 min read
Save for later

Introducing Splunk

Packt
03 Mar 2015
14 min read
In this article by Betsy Page Sigman, author of the book Splunk Essentials, Splunk, whose "name was inspired by the process of exploring caves, or splunking, helps analysts, operators, programmers, and many others explore data from their organizations by obtaining, analyzing, and reporting on it. This multinational company, cofounded by Michael Baum, Rob Das, and Erik Swan, has a core product called "Splunk Enterprise. This manages searches, inserts, deletes, and filters, and analyzes big data that is generated by machines, as well as other types of data. "They also have a free version that has most of the capabilities of Splunk Enterprise and is an excellent learning tool. (For more resources related to this topic, see here.) Understanding events, event types, and fields in Splunk An understanding of events and event types is important before going further. Events In Splunk, an event is not just one of" the many local user meetings that are set up between developers to help each other out (although those can be very useful), "but also refers to a record of one activity that is recorded in a log file. Each event usually has: A timestamp indicating the date and exact time the event was created Information about what happened on the system that is being tracked Event types An event type is a way to allow "users to categorize similar events. It is field-defined by the user. You can define an event type in several ways, and the easiest way is by using the SplunkWeb interface. One common reason for setting up an event type is to examine why a system has failed. Logins are often problematic for systems, and a search for failed logins can help pinpoint problems. For an interesting example of how to save "a search on failed logins as an event type, visit http://docs.splunk.com/Documentation/Splunk/6.1.3/Knowledge/ClassifyAndGroupSimilarEvents#Save_a_search_as_a_new_event_type. Why are events and event types so important in Splunk? Because without events, there would be nothing to search, of course. And event types allow us to make meaningful searches easily and quickly according to our needs, as we'll see later. Sourcetypes Sourcetypes are also "important to understand, as they help define the rules for an event. A sourcetype is one of the default fields that Splunk assigns to data as it comes into the system. It determines what type of data it is so that Splunk can format it appropriately as it indexes it. This also allows the user who wants to search the "data to easily categorize it. Some of the common sourcetypes are listed as follows: access_combined, for "NCSA combined format HTTP web server logs apache_error, for standard "Apache web server error logs cisco_syslog, for the "standard syslog produced by Cisco network devices (including PIX firewalls, routers, and ACS), usually via remote syslog to a central log host websphere_core, a core file" export from WebSphere (Source: http://docs.splunk.com/Documentation/Splunk/latest/Data/Whysourcetypesmatter) Fields Each event in Splunk is" associated with a number of fields. The core fields of host, course, sourcetype, and timestamp are key to Splunk. These fields are extracted from events at multiple points in the data processing pipeline that Splunk uses, and each of these fields includes a name and a value. The name describes the field (such as the userid) and the value says what that field's value is (susansmith, for example). Some of these fields are default fields that are given because of where the event came from or what it is. When data is processed by Splunk, and when it is indexed or searched, it uses these fields. For indexing, the default fields added include those of host, source, and sourcetype. When searching, Splunk is able to select from a bevy of fields that can either be defined by the user or are very basic, such as action results in a purchase (for a website event). Fields are essential for doing the basic work of Splunk – that is, indexing and searching. Getting data into Splunk It's time to spring into action" now and input some data into Splunk. Adding data is "simple, easy, and quick. In this section, we will use some data and tutorials created by Splunk to learn how to add data: Firstly, to obtain your data, visit the tutorial data at http://docs.splunk.com/Documentation/Splunk/6.1.5/SearchTutorial/GetthetutorialdataintoSplunk that is readily available on Splunk. Here, download the folder tutorialdata.zip. Note that this will be a fresh dataset that has been collected over the last 7 days. Download it but don't extract the data from it just yet. You then need to log in to Splunk, using admin as the username and then by using your password. Once logged in, you will notice that toward the upper-right corner of your screen is the button Add Data, as shown in the following screenshot. Click "on this button: Button to Add Data Once you have "clicked on this button, you'll see a screen" similar to the "following screenshot: Add Data to Splunk by Choosing a Data Type or Data Source Notice here the "different types of data that you can select, as "well as the different data sources. Since the data we're going to use is a file, under "Or Choose a Data Source, click on From files and directories. Once you have clicked on this, you can then click on the radio button next to Skip preview, as indicated in the following screenshot, since you don't need to preview the data" now. You then need to click on "Continue: Preview data You can download the tutorial files at: http://docs.splunk.com/Documentation/Splunk/6.1.5/SearchTutorial/GetthetutorialdataintoSplunk As shown in the next screenshot, click on Upload and index a file, find the tutorialdata.zip file you just downloaded (it is probably in your Downloads folder), and then click on More settings, filling it in as shown in the following screenshot. (Note that you will need to select Segment in path under Host and type 1 under Segment Number.) Click on Save when you are done: Can specify source, additional settings, and source type Following this, you "should see a screen similar to the following" screenshot. Click on Start Searching, we will look at the data now: You should see this if your data has been successfully indexed into Splunk. You will now" see a screen similar to the following" screenshot. Notice that the number of events you have will be different, as will the time of the earliest event. At this point, click on Data Summary: The Search screen You should see the Data Summary screen like in the following screenshot. However, note that the Hosts shown here will not be the same as the ones you get. Take a quick look at what is on the Sources tab and the Sourcetypes tab. Then find the most recent data (in this case 127.0.0.1) and click on it. Data Summary, where you can see Hosts, Sources, and Sourcetypes After" clicking on the most recent data, which in "this case is bps-T341s, look at the events contained there. Later, when we use streaming data, we can see how the events at the top of this list change rapidly. Here, you will see a listing of events, similar to those shown in the "following screenshot: Events lists for the host value You can click on the Splunk logo in the upper-left corner "of the web page to return to the home page. Under Administrator at the "top-right of the page, click on Logout. Searching Twitter data We will start here by doing a simple search of our Twitter index, which is automatically created by the app once you have enabled Twitter input (as explained previously). In our earlier searches, we used the default index (which the tutorial data was downloaded to), so we didn't have to specify the index we wanted to use. Here, we will use just the Twitter index, so we need to specify that in the search. A simple search Imagine that we wanted to search for tweets containing the word coffee. We could use the code presented here and place it in the search bar: index=twitter text=*coffee* The preceding code searches only your Twitter index and finds all the places where the word coffee is mentioned. You have to put asterisks there, otherwise you will only get the tweets with just "coffee". (Note that the text field is not case sensitive, so tweets with either "coffee" or "Coffee" will be included in the search results.) The asterisks are included before and after the text "coffee" because otherwise we would only get events where just "coffee" was tweeted – a rather rare occurrence, we expect. In fact, when we search our indexed Twitter data without the asterisks around coffee, we got no results. Examining the Twitter event Before going further, it is useful to stop and closely examine the events that are collected as part of the search. The sample tweet shown in the following screenshot shows the large number of fields that are part of each tweet. The > was clicked to expand the event: A Twitter event There are several items to look closely at here: _time: Splunk assigns a timestamp for every event. This is done in UTC (Coordinated Universal Time) time format. contributors: The value for this field is null, as are the values of many Twitter fields. Retweeted_status: Notice the {+} here; in the following event list, you will see there are a number of fields associated with this, which can be seen when the + is selected and the list is expanded. This is the case wherever you see a {+} in a list of fields: Various retweet fields In addition to those shown previously, there are many other fields associated with a tweet. The 140 character (maximum) text field that most people consider to be the tweet is actually a small part of the actual data collected. The implied AND If you want to search on more than one term, there is no need to add AND as it is already implied. If, for example, you want to search for all tweets that include both the text "coffee" and the text "morning", then use: index=twitter text=*coffee* text=*morning* If you don't specify text= for the second term and just put *morning*, Splunk assumes that you want to search for *morning* in any field. Therefore, you could get that word in another field in an event. This isn't very likely in this case, although coffee could conceivably be part of a user's name, such as "coffeelover". But if you were searching for other text strings, such as a computer term like log or error, such terms could be found in a number of fields. So specifying the field you are interested in would be very important. The need to specify OR Unlike AND, you must always specify the word OR. For example, to obtain all events that mention either coffee or morning, enter: index=twitter text=*coffee* OR text=*morning* Finding other words used Sometimes you might want to find out what other words are used in tweets about coffee. You can do that with the following search: index=twitter text=*coffee* | makemv text | mvexpand text | top 30 text This search first searches for the word "coffee" in a text field, then creates a multivalued field from the tweet, and then expands it so that each word is treated as a separate piece of text. Then it takes the top 30 words that it finds. You might be asking yourself how you would use this kind of information. This type of analysis would be of interest to a marketer, who might want to use words that appear to be associated with coffee in composing the script for an advertisement. The following screenshot shows the results that appear (1 of 2 pages). From this search, we can see that the words love, good, and cold might be words worth considering: Search of top 30 text fields found with *coffee* When you do a search like this, you will notice that there are a lot of filler words (a, to, for, and so on) that appear. You can do two things to remedy this. You can increase the limit for top words so that you can see more of the words that come up, or you can rerun the search using the following code. "Coffee" (with a capital C) is listed (on the unshown second page) separately here from "coffee". The reason for this is that while the search is not case sensitive (thus both "coffee" and "Coffee" are picked up when you search on "coffee"), the process of putting the text fields through the makemv and the mvexpand processes ends up distinguishing on the basis of case. We could rerun the search, excluding some of the filler words, using the code shown here: index=twitter text=*coffee* | makemv text | mvexpand text |search NOT text="RT" AND NOT text="a" AND NOT text="to" ANDNOT text="the" | top 30 text Using a lookup table Sometimes it is useful to use a lookup file to avoid having to use repetitive code. It would help us to have a list of all the small words that might be found often in a tweet just by the nature of each word's frequent use in language, so that we might eliminate them from our quest to find words that would be relevant for use in the creation of advertising. If we had a file of such small words, we could use a command indicating not to use any of these more common, irrelevant words when listing the top 30 words associated with our search topic of interest. Thus, for our search for words associated with the text "coffee", we would be interested in words like " dark", "flavorful", and "strong", but not words like "a", "the", and "then". We can do this using a lookup command. There are three types of lookup commands, which are presented in the following table: Command Description lookup Matches a value of one field with a value of another, based on a .csv file with the two fields. Consider a lookup table named lutable that contains fields for machine_name and owner. Consider what happens when the following code snippet is used after a preceding search (indicated by . . . |): . . . | lookup lutable owner Splunk will use the lookup table to match the owner's name with its machine_name and add the machine_name to each event. inputlookup All fields in the .csv file are returned as results. If the following code snippet is used, both machine_name and owner would be searched: . . . | inputlookup lutable outputlookup This code outputs search results to a lookup table. The following code outputs results from the preceding research directly into a table it creates: . . . | outputlookup newtable.csv saves The command we will use here is inputlookup, because we want to reference a .csv file we can create that will include words that we want to filter out as we seek to find possible advertising words associated with coffee. Let's call the .csv file filtered_words.csv, and give it just a single text field, containing words like "is", "the", and "then". Let's rewrite the search to look like the following code: index=twitter text=*coffee*| makemv text | mvexpand text| search NOT [inputlookup filtered_words | fields text ]| top 30 text Using the preceding code, Splunk will search our Twitter index for *coffee*, and then expand the text field so that individual words are separated out. Then it will look for words that do NOT match any of the words in our filtered_words.csv file, and finally output the top 30 most frequently found words among those. As you can see, the lookup table can be very useful. To learn more about Splunk lookup tables, go to http://docs.splunk.com/Documentation/Splunk/6.1.5/SearchReference/Lookup. Summary In this article, we have learned more about how to use Splunk to create reports, dashboards. Splunk Enterprise Software, or Splunk, is an extremely powerful tool for searching, exploring, and visualizing data of all types. Splunk is becoming increasingly popular, as more and more businesses, both large and small, discover its ease and usefulness. Analysts, managers, students, and others can quickly learn how to use the data from their systems, networks, web traffic, and social media to make attractive and informative reports. This is a straightforward, practical, and quick introduction to Splunk that should have you making reports and gaining insights from your data in no time. Resources for Article: Further resources on this subject: Lookups [article] Working with Apps in Splunk [article] Loading data, creating an app, and adding dashboards and reports in Splunk [article]
Read more
  • 0
  • 0
  • 2527

article-image-quick-start-guide-flume
Packt
02 Mar 2015
15 min read
Save for later

A Quick Start Guide to Flume

Packt
02 Mar 2015
15 min read
In this article by Steve Hoffman, the author of the book, Apache Flume: Distributed Log Collection for Hadoop - Second Edition, we will learn about the basics that are required to be known before we start working with Apache Flume. This article will help you get started with Flume. So, let's start with the first step: downloading and configuring Flume. (For more resources related to this topic, see here.) Downloading Flume Let's download Flume from http://flume.apache.org/. Look for the download link in the side navigation. You'll see two compressed .tar archives available along with the checksum and GPG signature files used to verify the archives. Instructions to verify the download are on the website, so I won't cover them here. Checking the checksum file contents against the actual checksum verifies that the download was not corrupted. Checking the signature file validates that all the files you are downloading (including the checksum and signature) came from Apache and not some nefarious location. Do you really need to verify your downloads? In general, it is a good idea and it is recommended by Apache that you do so. If you choose not to, I won't tell. The binary distribution archive has bin in the name, and the source archive is marked with src. The source archive contains just the Flume source code. The binary distribution is much larger because it contains not only the Flume source and the compiled Flume components (jars, javadocs, and so on), but also all the dependent Java libraries. The binary package contains the same Maven POM file as the source archive, so you can always recompile the code even if you start with the binary distribution. Go ahead, download and verify the binary distribution to save us some time in getting started. Flume in Hadoop distributions Flume is available with some Hadoop distributions. The distributions supposedly provide bundles of Hadoop's core components and satellite projects (such as Flume) in a way that ensures things such as version compatibility and additional bug fixes are taken into account. These distributions aren't better or worse; they're just different. There are benefits to using a distribution. Someone else has already done the work of pulling together all the version-compatible components. Today, this is less of an issue since the Apache BigTop project started (http://bigtop.apache.org/). Nevertheless, having prebuilt standard OS packages, such as RPMs and DEBs, ease installation as well as provide startup/shutdown scripts. Each distribution has different levels of free and paid options, including paid professional services if you really get into a situation you just can't handle. There are downsides, of course. The version of Flume bundled in a distribution will often lag quite a bit behind the Apache releases. If there is a new or bleeding-edge feature you are interested in using, you'll either be waiting for your distribution's provider to backport it for you, or you'll be stuck patching it yourself. Furthermore, while the distribution providers do a fair amount of testing, such as any general-purpose platform, you will most likely encounter something that their testing didn't cover, in which case, you are still on the hook to come up with a workaround or dive into the code, fix it, and hopefully, submit that patch back to the open source community (where, at a future point, it'll make it into an update of your distribution or the next version). So, things move slower in a Hadoop distribution world. You can see that as good or bad. Usually, large companies don't like the instability of bleeding-edge technology or making changes often, as change can be the most common cause of unplanned outages. You'd be hard pressed to find such a company using the bleeding-edge Linux kernel rather than something like Red Hat Enterprise Linux (RHEL), CentOS, Ubuntu LTS, or any of the other distributions whose target is stability and compatibility. If you are a startup building the next Internet fad, you might need that bleeding-edge feature to get a leg up on the established competition. If you are considering a distribution, do the research and see what you are getting (or not getting) with each. Remember that each of these offerings is hoping that you'll eventually want and/or need their Enterprise offering, which usually doesn't come cheap. Do your homework. Here's a short, nondefinitive list of some of the more established players. For more information, refer to the following links: Cloudera: http://cloudera.com/ Hortonworks: http://hortonworks.com/ MapR: http://mapr.com/ An overview of the Flume configuration file Now that we've downloaded Flume, let's spend some time going over how to configure an agent. A Flume agent's default configuration provider uses a simple Java property file of key/value pairs that you pass as an argument to the agent upon startup. As you can configure more than one agent in a single file, you will need to additionally pass an agent identifier (called a name) so that it knows which configurations to use. In my examples where I'm only specifying one agent, I'm going to use the name agent. By default, the configuration property file is monitored for changes every 30 seconds. If a change is detected, Flume will attempt to reconfigure itself. In practice, many of the configuration settings cannot be changed after the agent has started. Save yourself some trouble and pass the undocumented --no-reload-conf argument when starting the agent (except in development situations perhaps). If you use the Cloudera distribution, the passing of this flag is currently not possible. I've opened a ticket to fix that at https://issues.cloudera.org/browse/DISTRO-648. If this is important to you, please vote it up. Each agent is configured, starting with three parameters: agent.sources=<list of sources>agent.channels=<list of channels>agent.sinks=<list of sinks> Each source, channel, and sink also has a unique name within the context of that agent. For example, if I'm going to transport my Apache access logs, I might define a channel named access. The configurations for this channel would all start with the agent.channels.access prefix. Each configuration item has a type property that tells Flume what kind of source, channel, or sink it is. In this case, we are going to use an in-memory channel whose type is memory. The complete configuration for the channel named access in the agent named agent would be: agent.channels.access.type=memory Any arguments to a source, channel, or sink are added as additional properties using the same prefix. The memory channel has a capacity parameter to indicate the maximum number of Flume events it can hold. Let's say we didn't want to use the default value of 100; our configuration would now look like this: agent.channels.access.type=memoryagent.channels.access.capacity=200 Finally, we need to add the access channel name to the agent.channels property so that the agent knows to load it: agent.channels=access Let's look at a complete example using the canonical "Hello, World!" example. Starting up with "Hello, World!" No technical article would be complete without a "Hello, World!" example. Here is the configuration file we'll be using: agent.sources=s1agent.channels=c1agent.sinks=k agent.sources.s1.type=netcatagent.sources.s1.channels=c1agent.sources.s1.bind=0.0.0.0agent.sources.s1.port=1234 agent.channels.c1.type=memory agent.sinks.k1.type=loggeragent.sinks.k1.channel=c1 Here, I've defined one agent (called agent) who has a source named s1, a channel named c1, and a sink named k1. The s1 source's type is netcat, which simply opens a socket listening for events (one line of text per event). It requires two parameters: a bind IP and a port number. In this example, we are using 0.0.0.0 for a bind address (the Java convention to specify listen on any address) and port 12345. The source configuration also has a parameter called channels (plural), which is the name of the channel(s) the source will append events to, in this case, c1. It is plural, because you can configure a source to write to more than one channel; we just aren't doing that in this simple example. The channel named c1 is a memory channel with a default configuration. The sink named k1 is of the logger type. This is a sink that is mostly used for debugging and testing. It will log all events at the INFO level using Log4j, which it receives from the configured channel, in this case, c1. Here, the channel keyword is singular because a sink can only be fed data from one channel. Using this configuration, let's run the agent and connect to it using the Linux netcat utility to send an event. First, explode the .tar archive of the binary distribution we downloaded earlier: $ tar -zxf apache-flume-1.5.2-bin.tar.gz$ cd apache-flume-1.5.2-bin Next, let's briefly look at the help. Run the flume-ng command with the help command: $ ./bin/flume-ng helpUsage: ./bin/flume-ng <command> [options]... commands:help                 display this help textagent                run a Flume agentavro-client           run an avro Flume clientversion               show Flume version info global options:--conf,-c <conf>     use configs in <conf> directory--classpath,-C <cp>   append to the classpath--dryrun,-d          do not actually start Flume, just print the command--plugins-path <dirs> colon-separated list of plugins.d directories. See the                       plugins.d section in the user guide for more details.                       Default: $FLUME_HOME/plugins.d-Dproperty=value     sets a Java system property value-Xproperty=value     sets a Java -X option agent options:--conf-file,-f <file> specify a config file (required)--name,-n <name>     the name of this agent (required)--help,-h             display help text avro-client options:--rpcProps,-P <file>   RPC client properties file with server connection params--host,-H <host>       hostname to which events will be sent--port,-p <port>       port of the avro source--dirname <dir>       directory to stream to avro source--filename,-F <file>   text file to stream to avro source (default: std input)--headerFile,-R <file> File containing event headers as key/value pairs on each new line--help,-h             display help text Either --rpcProps or both --host and --port must be specified. Note that if <conf> directory is specified, then it is always included first in the classpath. As you can see, there are two ways with which you can invoke the command (other than the simple help and version commands). We will be using the agent command. The use of avro-client will be covered later. The agent command has two required parameters: a configuration file to use and the agent name (in case your configuration contains multiple agents). Let's take our sample configuration and open an editor (vi in my case, but use whatever you like): $ vi conf/hw.conf Next, place the contents of the preceding configuration into the editor, save, and exit back to the shell. Now you can start the agent: $ ./bin/flume-ng agent -n agent -c conf -f conf/hw.conf -Dflume.root.logger=INFO,console The -Dflume.root.logger property overrides the root logger in conf/log4j.properties to use the console appender. If we didn't override the root logger, everything would still work, but the output would go to the log/flume.log file instead of being based on the contents of the default configuration file. Of course, you can edit the conf/log4j.properties file and change the flume.root.logger property (or anything else you like). To change just the path or filename, you can set the flume.log.dir and flume.log.file properties in the configuration file or pass additional flags on the command line as follows: $ ./bin/flume-ng agent -n agent -c conf -f conf/hw.conf -Dflume.root.logger=INFO,console -Dflume.log.dir=/tmp -Dflume.log.file=flume-agent.log You might ask why you need to specify the -c parameter, as the -f parameter contains the complete relative path to the configuration. The reason for this is that the Log4j configuration file should be included on the class path. If you left the -c parameter off the command, you'll see this error: Warning: No configuration directory set! Use --conf <dir> to override.log4j:WARN No appenders could be found for logger (org.apache.flume.lifecycle.LifecycleSupervisor).log4j:WARN Please initialize the log4j system properly.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info But you didn't do that so you should see these key log lines: 2014-10-05 15:39:06,109 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration foragents: [agent] This line tells you that your agent starts with the name agent. Usually you'd look for this line only to be sure you started the right configuration when you have multiple configurations defined in your configuration file. 2014-10-05 15:39:06,076 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloadingconfiguration file:conf/hw.conf This is another sanity check to make sure you are loading the correct file, in this case our hw.conf file. 2014-10-05 15:39:06,221 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)]Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:s1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@442fbe47 counterGroup:{ name:null counters:{} } }}channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } Once all the configurations have been parsed, you will see this message, which shows you everything that was configured. You can see s1, c1, and k1, and which Java classes are actually doing the work. As you probably guessed, netcat is a convenience for org.apache.flume.source.NetcatSource. We could have used the class name if we wanted. In fact, if I had my own custom source written, I would use its class name for the source's type parameter. You cannot define your own short names without patching the Flume distribution. 2014-10-05 15:39:06,427 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] CreatedserverSocket:sun.nio.ch.ServerSocketChannelImpl[/0.0.0.0:12345] Here, we see that our source is now listening on port 12345 for the input. So, let's send some data to it. Finally, open a second terminal. We'll use the nc command (you can use Telnet or anything else similar) to send the Hello World string and press the Return (Enter) key to mark the end of the event: % nc localhost 12345Hello WorldOK The OK message came from the agent after we pressed the Return key, signifying that it accepted the line of text as a single Flume event. If you look at the agent log, you will see the following: 2014-10-05 15:44:11,215 (SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64Hello World } This log message shows you that the Flume event contains no headers (NetcatSource doesn't add any itself). The body is shown in hexadecimal along with a string representation (for us humans to read, in this case, our Hello World message). If I send the following line and then press the Enter key, you'll get an OK message: The quick brown fox jumped over the lazy dog. You'll see this in the agent's log: 2014-10-05 15:44:57,232 (SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)]Event: { headers:{} body: 54 68 65 20 71 75 69 63 6B 20 62 72 6F 77 6E 20The quick brown } The event appears to have been truncated. The logger sink, by design, limits the body content to 16 bytes to keep your screen from being filled with more than what you'd need in a debugging context. If you need to see the full contents for debugging, you should use a different sink, perhaps the file_roll sink, which would write to the local filesystem. Summary In this article, we covered how to download the Flume binary distribution. We created a simple configuration file that included one source writing to one channel, feeding one sink. The source listened on a socket for network clients to connect to and to send it event data. These events were written to an in-memory channel and then fed to a Log4j sink to become the output. We then connected to our listening agent using the Linux netcat utility and sent some string events to our Flume agent's source. Finally, we verified that our Log4j-based sink wrote the events out. Resources for Article: Further resources on this subject: About Cassandra [article] Introducing Kafka [article] Transformation [article]
Read more
  • 0
  • 0
  • 5456