Tuesday 30 December 2014

Tasks and Stages in Spark

1. Different with MapReduce, an application in Spark contains multiple jobs, 
since each action() in an application will correspond to a job.

2. Each job contains multiple stages. Each stage contains multiple tasks.
The last stage generates result. A stage is executed only when its parent stage is finished.

3. NarrowDependency vs. ShuffleDependency.
RDD's partitions depend on all partitions of its parent RDD.
The full dependency between RDD and its parent RDD is called NarrowDependency.

RDD's partitions depend on parts of partitions of its parent RDD.
The partial dependency between RDD and its parent RDD is called ShuffleDependency.

4. NarrowDependency constructs pipeline. The number of tasks is as the same as the number of partitions of its last RDD.
Data is computed only when it is needed and in the results generated position.
Compute some partitions of the left most RDD in a stage first.

5. The computing chain is built from back to front based on the data dependency.
A stage is formed by adding each NarrowDependency, and break into a new stage when encounters a ShuffleDependency. The last stage is id0, its parent stage is id1.
If a stage generates results, its tasks are ResultTasks(like reducer). Otherwise, its tasks are ShuffleMapTask(like mapper).

6. The number of tasks of a stage is determinate by the number of partition of this stage's last RDD.
In a stage, each RDD calls parentRDD.iter() in compute() to fetch the records of its parent RDD.




Data Stage Graph

Reference:
The contents of this article are from https://github.com/JerryLead/SparkInternals

Monday 29 December 2014

Cache v.s. Checkpoint in Spark

1. RDD.cache v.s Checkpoint

Cache commands indicate that spark needs to keep these rdd’s in memory. This will not cause the RDD to be instantly be cached, instead it will be cached the next time it is loaded into memory.

Checkpoint process:

Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed


(1) Cache materializes the RDD and keeps it in memory. But the lineage of RDD (that is, seq of operations that generated the RDD) will be remembered, so that if there are node failures and parts of the cached RDDs are lost, they can be regenerated. 

However, checkpoint saves the RDD to an HDFS file and actually forgets the lineage completely. This is allows long lineages to be truncated and the data to be saved reliably in HDFS (which is naturally fault tolerant by replication).

(2) A partition is directly cached in memory. However, checkpoint has to wait for the current job finishes, then start another job(finalRDD.doCheckpoint()) to finish the persistent.
This means a checkpointed RDD will be computed twice. Therefore, suggest to add rdd.cache() before rdd.checkpoint(). This will make the checkpoint read rdd from memory then write into disk.

2. RDD.persist v.s Checkpoint

Although RDD.persist can persist RDD on disk,but this partition is managed by blockManager.
Once driver program is finished, blockManager stops,the cached RDD is deleted from disk(the local folder of blockManager is deleted).
While Checkpoint persists RDD to local folder/HDFS,  this RDD can be reused by the other driver program.



Reference:
The contents of this article are from https://github.com/JerryLead/SparkInternals




Create and Read Cache in Spark

1. Create a cache


1. After rdd.cache() is called, rdd becomes persistRDD, its StorageLevel is MEMORY_ONLY.

2. When rdd.iterator() is called, a partition is computed. Retrieve a blockId from CacheManager.

3. Check whether this partition is checkpoinited in BlockManger. If yes, it means the task has been computed before, read the records of that partition from checkpoint directly.

4. Otherwise, calculate the partition, put all its records in an Elements, then cached by BlockManager. BlockManager stores Elements in LinkedHashMap[BlockId, Entry], which is managed by MemoryStore.


2. Read from cache



1. When compute a partition in a RDD, check whether it is cached from BlockManager.
If the partition is cached in local, call blockManager.getLocal() to read from memoryStore in local. Otherwise, call blockManager.getRemote() to read from other nodes.

2. After a partition is cached in a node, whose blockManager will send this info to Driver's blockMangerMasterActor's blockLocations. When the RDD is needed, call blockManagerMaster.getLocations(blockId) to lookup partition's info. Driver lookups blockLocations , then send the info to task.

3. Once task get the cached partition's location info, calls GetBlock(blockId) through connectionManager to the node holding the cached partition. The target node reads the partition from its blockManager's memoryStore, and send it to task.

Reference:
The contents of this article are from https://github.com/JerryLead/SparkInternals

Job Submission and Task Execution in Spark

Each action() in driver program generates a job. Submit jobs to DAGScheduler.
Each job has multiple stages, the last one generates results.
DAGScheduler will submit stages without parent stages first, and determines the number of tasks and submit tasks. Then, submit the stages with parent stages.

1. Job Submission


1. Generate job logic plan
Driver program calls transformation() to create computing chain(a serial of RDDs).
The compute() of each RDD defines how to generate the partitions.
getDependencies() generates the relationships between the partitions of RDDs.

2. Generate job physical plan
Each action() triggers a job.
Create stages in DAGScheduler.runJob().
Determine ShuffleMapTasks or ResultTasks in submitStage(), then pack tasks into TaskSet for taskScheduler.

3. Allocate Tasks
After sparkDeploySchedulerBackend receives taskSet, send serializedTask to work node's CoarseGrainedExecutorBackend Actor by DriverActor.

4. Job Receive
Executor wraps task into taskRunner, then start a idle thread to run the task.
Each CoarseGrainedExecutorBackend has only one executor object.


2. Task Execution


1.  After executor gets the serialized task, deserialize it, then run the task to get direct Result.

2. If the result is small, send back to driver directly. Otherwise, store the result in local memory and disk managed by BlockManager.  Instead sending the indirectResult to driver. Driver will fetch it through HTTP if needed.

3. After driver gets the task result, it tells taskScheduler this task is finished and analyze the result.

4. If result from ResultTask, call resultHandler to compute it in driver, e.g count().
If result is MapStatus of ShuffleMapTask, store it to mapOutputTrackerMaster for lookuping in reducer shuffle.

5. If the task is the last one in a stage, then submit the next stage.
If the stage is the last one in a job, tell DAGScheduler the job is finished.


Reference:
The contents of this article are from https://github.com/JerryLead/SparkInternals


Deployment Architecture in Spark



Deploy Graph

1. One Master node is in charge of multiple Worker nodes.

2. Worker nodes communicate with Master, and manage executors.
Each worker has multiple ExecutorBackend processes.
Each ExecutorBackend process has a Executor object, which has a task thread pool.

3. Driver is the process running the main() function of the application and create the SparkContext.

4. Each application has a driver and multiple executors.
All tasks in an executor belong to the same application.



Logic plan



Reference:
The contents of this artical are from https://github.com/JerryLead/SparkInternals

Sunday 28 December 2014

Accumulator variables in Spark

A shared variable, accumulators, provides a simple syntax for aggregating values from worker nodes back to the driver program. One of the most common uses of accumulators is to count events that occur during job execution for debugging purposes.
  • Accumulators are variables that can only be “added” to through an associative operation used to implement counters and sums, efficiently in parallel.
  • Spark natively supports accumulators of numeric value types and standard mutable collections, and programmers can extend for new types.
  • Only the driver program can read an accumulator’s value, using its value() method, not the tasks

val accum = sc.accumulator(0)
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
accum.value

Note that tasks on worker nodes cannot access the accumulator’s value()—from the point of view of these tasks, accumulators are write-only variables.

The type of counting shown here becomes especially handy when there are multiple values to keep track of, or when the same value needs to increase at multiple places in the parallel program

Reference:
http://spark.apache.org/docs/latest/programming-guide.html#transformations

MLlib Algorithms in Spark 1.2

1. classification: logistic regression, linear SVM,"
naïve Bayes, classification tree

2. regression: generalized linear models (GLMs),
regression tree

3. collaborative filtering: alternating least squares (ALS),
non-negative matrix factorization (NMF)

4. clustering: k-means

5. decomposition: SVD, PCA


6. optimization: stochastic gradient descent, L-BFGS

Limitations of MapReduce

1. MapReduce is great at one-pass computation, but inefficient for multi-pass algorithms.

2. No efficient primitives for data sharing.

  • State between steps goes to distributed file systems.
  • Slow due to replication & disk storage.

3. Most algorithms are much harder toimplement directly in restrictive MapReduce model

4. Performance bottlenecks, or batch not fitting the use cases

For example in PageRank:

  • To repeatedly multiply sparse matrix and vector, it requires repeatedly hashing together page adjacency lists and rank vector.
  • Using cache(), keep neighbor lists in RAM
  • Using partitioning, avoid repeated hashing


Therefore, MapReduce requires asymptotically more communication and I/O.

Saturday 27 December 2014

map vs flatMap vs reduce in Spark

1. map

The map transformation takes in a function and applies it to each element in the RDD with the result of the function being the new value of each element in the resulting RDD. The return type of the map does not have to be the same as the input type

val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x*x)
println(result.collect())


2. flatMap
  • flatMap produces multiple output elements for each input element. 
  • Like with map, the function we provide to flatMap is called individually for each element in our input RDD. 
  • Instead of returning a single element, we return an iterator with our return values. 
  • Rather than producing an RDD of iterators, we get back an RDD which consists of the elements from all of the iterators.
  • Often used to extract words.

val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first()  // returns "hello"


3. reduce


Reduce takes in a function which operates on two elements of the same type of your RDD and returns a new element of the same type. Reduce requires that the return type of our result be the same type as that of the RDD we are operating over.

val sum = rdd.reduce((x, y) => x + y)




Thursday 25 December 2014

Latent Semantic Analysis in MLlib


1. Latent Semantic Analysis (LSA) is a technique in natural language processing and information retrieval that seeks to better understand a corpus of documents and the relationships between the words in those documents. 
It attempts to distill the corpus into a set of relevant concepts.
Each concept captures a thread of variation in the data and often corresponds to a topic that the corpus discusses.

2. TF-IDF:
val tf = termFrequencyInDoc.toDouble / totalTermsInDoc
val docFreq = totalDocs.toDouble / termFreqInCorpus

TF-IDF captures two intuitions about the relevance of a term to a document. First, one would expect that the more often a term occurs in a document, the more important it is to that document. Second, not all terms are equal in a global sense. It is more meaningful to encounter a word that is occurs rarely in the entire corpus than a word that appears in most of the documents, thus the metric uses the inverse of the word’s appearance in documents in the full corpus.
log is used to mallow the differences of common words and rare words in document frequencies.


3. LSA discovers this lower-dimensional representation using a linear algebra technique called Singular Value Decomposition (SVD). 
  • It starts with a term-document matrix generated through the counting word frequencies for each document. In this matrix, each document corresponds to a column, each term corresponds to a row, and each element represents the importance of a word to a document. 
  • SVD then factorizes this matrix into three matrices, one of which expresses concepts in regard to documents, one of which expresses concepts in regard to terms, and one of which contains the importance for each concept. 
  • The structure of these matrices is such that a low-rank approximation of the original matrix can be achieved by removing a set of their rows and columns corresponding to the least important concepts. That is, the matrices in this low-rank approximation can be multiplied to produce a matrix close to the original, with increasing loss of fidelity as each concept is removed.


The singular value decomposition takes a m x n matrix and returns three matrices that approximately equal it when multiplied together.

  • U is a m x k matrix where each row corresponds to a document and each column corresponds to a concept.
  • S is a k x k diagonal matrix that holds the singular values. Each diagonal element in S corresponds to a single concept.
  • V is a n x k matrix where each row corresponds to a term and each column corresponds to a concept.
In the LSA case, m is the number of documents and n is the number of terms. The decomposition is parameterized with a number k, less than or equal to n, that indicates how many concepts to keep around. A key insight of LSA is that only a small number of concepts are important to representing that data.
To find the singular value decomposition of a matrix, simply wrap an RDD of row vectors in a RowMatrix and call computeSVD:


import org.apache.spark.mllib.linalg.distributed.RowMatrix

termDocMatrix.cache()
val mat = new RowMatrix(termDocMatrix)
val k = 1000
val svd = mat.computeSVD(k, computeU=true)


Reference:
"Advanced Analytics with Spark"

Wednesday 24 December 2014

Anomaly Detection with K-means Clustering


1. Supervised vs. unsupervised learning

In order to predict unknown values for new data, we had to know that target value for many previously-seen examples. 
Classifiers can only help if we, the data scientists, know what we are looking for already, and can provide plenty of examples where input produced a known output. 
These were collectively known as supervised learning techniques, because their learning process is given the correct output value for each example in the input.

However, there are problems in which the correct output is unknown for some or all examples.
unsupervised learning techniques can help here. These techniques do not learn to predict any target values, since none are available. They can however learn structure in data, and find groupings of similar inputs, or learn what types of input are likely to occur and what types are not. 

2. Anomaly detection

Anomaly detection is often used to find fraud, detect network attacks, or discover problems in servers or other sensor-equipped machinery. In these cases, it’s important to be able to find new types of anomalies that have never seen before—new forms of fraud, new intrusions, new failure modes for servers.An anomaly that has been observed and understood is no longer an anomaly.

3. K-means

It attempts to detect k clusters in a data set, where k is given by the data scientist. choosing a good value for k will be a central topic. A clustering could be considered good if each data point were near to its closest centroid.
It is common to use simple Euclidean distance to measure distance between data points with K-means, this is the only distance function supported by Spark MLlib as of this writing. 

This center is called the cluster centroid, and is defined to be the arithmetic mean of the points—hence the name K-means.
  1. To start, the algorithm intelligently picks some data points as the initial cluster centroids. 
  2. Then each data point is assigned to the nearest centroid. 
  3. Then for each cluster, a new cluster centroid is computed as the mean of the data points just assigned to that cluster. 
  4. This process is repeated.

Tuesday 23 December 2014

Prediction Measure Metrics in MLlib


val metrics = new MulticlassMetrics(predictionsAndLabels)

1. metrics.confusionMatrix


N target category values generate a N-by-N matrix, where each row corresponds to an actual correct value, and each column to a predicted value, in order. The entry at row i and column j counts the number of times an example with true category i was predicted as category j. So, the correct predictions are the counts along the diagonal, and incorrect predictions are everything else. Here it seems that, indeed, counts are high along the diagonal, which is a good sign.

Example:
14019.0  6630.0   15.0    0.0    0.0  1.0   391.0
5413.0   22399.0  438.0   16.0   0.0  3.0   50.0
0.0      457.0    2999.0  73.0   0.0  12.0  0.0
0.0      1.0      163.0   117.0  0.0  0.0   0.0
0.0      872.0    40.0    0.0    0.0  0.0   0.0
0.0      500.0    1138.0  36.0   0.0  48.0  0.0

1091.0   41.0     0.0     0.0    0.0  0.0   891.0

2. metrics.precision/recall


Precision is actually a common metric for binary classification problems, where there are 2 category values, not several. In a binary classification problem, where there is some kind of positive and negative class, precision is the proportion of all examples that the classifier marked positive that are actually positive
It is often accompanied by the metric recall. This is the proportion of all examples that are actually positive that the classifier marked positive.

3. The AUC metric is also used in evaluation of classifiers.


The Receiver Operating Characteristic (ROC) curve. The curve is created by plotting the true positive rate against the false positive rate at various threshold settings.
AUC equals the area under this curve, for Area Under the Curve. AUC may be viewed as the probability that a randomly-chosen “good” above a randomly-chosen “bad”.

Sunday 21 December 2014

Trouble Shooting for Kafka-Camus Example

Camus (https://github.com/linkedin/camus) is another art of work done by LinkedIn, which provides a pipeline from Kafka to HDFS. Under this project, a single MapReduce job performs the following steps for loading data to HDFS in a distributed manner:

  1. As a first step, it discovers the latest topics and partition offsets from ZooKeeper.
  2. Each task in the MapReduce job fetches events from the Kafka broker and commits the pulled data along with the audit count to the output folders.
  3. After the completion of the job, final offsets are written to HDFS, which can be further consumed by subsequent MapReduce jobs.
  4. Information about the consumed messages is also updated in the Kafka cluster.
When you run the following example, you may encounter some issues.
$hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -P camus.properties

Issue 0:
java.lang.incompatiableclasschangeerror: found interface org.apache.mapreduce.jobcontext , but expected class

Solution:
This is because the incompatibility between hadoop1 and hadoop 2.
in pom.xml, set hadoop version as 2.2.0


Issue 1:
CamusJob] - Unable to pull requests from Kafka brokers. Exiting the program
java.lang.NumberFormatException: For input string: ""
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:48)
at java.lang.Integer.parseInt(Integer.java:470)
at java.lang.Integer.parseInt(Integer.java:499)
at org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1060)
at com.linkedin.camus.etl.kafka.CamusJob.getKafkaBufferSize(CamusJob.java:744)

Solution:
specify some Kafka-related properties or comment it (this way Camus will use default values):
# Fetch Request Parameters
# kafka.fetch.buffer.size=
# kafka.fetch.request.correlationid=
# kafka.fetch.request.max.wait=
# kafka.fetch.request.min.bytes=
# Connection parameters.
kafka.brokers=localhost:9092
# kafka.timeout.value=

Issue 2:
[CamusJob] - failed to create decoder
com.linkedin.camus.coders.MessageDecoderException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.linkedin.batch.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder not found
at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.createMessageDecoder(MessageDecoderFactory.java:28)
at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.createMessageDecoder(EtlInputFormat.java:408)

Solution:
Chang to the following line in camus.properties file
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder


Issue 3:
[CamusJob] - failed to create decoder
com.linkedin.camus.coders.MessageDecoderException: com.linkedin.camus.coders.MessageDecoderException: java.lang.ClassNotFoundException: com.linkedin.camus.example.DummySchemaRegistry
at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.createMessageDecoder(MessageDecoderFactory.java:28)

Solution:
Chang to the following line in camus.properties file 
kafka.message.coder.schema.registry.class=com.linkedin.camus.example. schemaregistry.DummySchemaRegistry



Issue 4: Register Avro Schema in Memory
If we have a few kafka topics, and not frequently changed, we can use a memory-based avro schema register.
1. need to know topic avsc
2. automatically generate topic.java files
3. Register in  DummySchemaRegistry.java

[CamusJob] - failed to create decoder
com.linkedin.camus.coders.MessageDecoderException: com.linkedin.camus.coders.MessageDecoderException: java.lang.InstantiationException: com.linkedin.camus.example.schemaregistry.DummySchemaRegistry
at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.createMessageDecoder(MessageDecoderFactory.java:28)

Solution :
 
This issue usually means the topic name isn't registered in the schema registry.
1. Generate DummyLog.java by avro-tool and DummyLog.avsc,
2. Register the topic-> schema pair in DummeySchemaRegistry.java

public class DummySchemaRegistry extends AvroMemorySchemaRegistry {
    public DummySchemaRegistry() {
    super();
//register topic name(DUMMY_LOG)->schema(Dummylog) pair in memory repo
    super.register("DUMMY_LOG", DummyLog.SCHEMA$);
}
}

Then, set Avro decoder in camus.properties file:
camus.message.decoder.class=
com.linkedin.batch.etl.kafka.coders.KafkaAvroMessageDecoder
etl.record.writer.provider.class=
com.linkedin.camus.etl.kafka.common.AvroRecordWriterProvider


Reference:

http://stackoverflow.com/questions/21508355/runing-camus-sample-with-kafka-0-8
https://groups.google.com/forum/#!topic/camus_etl/RzSHsDzOdow
https://groups.google.com/forum/#!topic/camus_etl/4f-Ru7Rhn8w
https://groups.google.com/forum/#!msg/camus_etl/EvJQsAC7wSA/ff9fkzLxrKYJ

Camus Consumes Avro and JSON from Kafka


Issue 1:
Consume JSON messages from Kafka to HDFS

Solution:
Set the following two properties in camus.properties file:

etl.record.writer.provider.class=
com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider

camus.message.decoder.class=
com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder

Issue 2:
Use avro-schema-repo service for schema sharing,

Error: com.linkedin.camus.coders.MessageDecoderException: java.lang.ClassNotFoundException: com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry
at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.createMessageDecoder(MessageDecoderFactory.java:28)

Solution:

1. Add below two properties in camus.properties file.
etl.schema.registry.url=http://localhost:2876/schema-repo
kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry

2. Because 'camus-schema-registry-avro' is not in the default camus-master/pom.xml and camus-example/pom.xml, we need add it in these two places and rebuild the jar.

Issue 3:
[CamusJob] - java.io.IOException: java.lang.RuntimeException: org.apache.avro.AvroTypeException: Expected record-start. Got VALUE_STRING
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:135)
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.nextKeyValue(EtlRecordReader.java:261)
        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:483)


Solution:
This is because the producer uses JsonEncoder.
Should use KafkaAvroMessageEncoder instead of JsonEncoder.


Issue 4:

Caused by: org.codehaus.jackson.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
 at [Source: java.io.StringReader@32b8f675; line: 1, column: 2]
at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
at org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:385)
at org.codehaus.jackson.impl.JsonParserMinimalBase._throwInvalidSpace(JsonParserMinimalBase.java:331)
at org.codehaus.jackson.impl.ReaderBasedParser._skipWSOrEnd(ReaderBasedParser.java:950)
at org.codehaus.jackson.impl.ReaderBasedParser.nextToken(ReaderBasedParser.java:247)
at org.apache.avro.io.JsonDecoder.configure(JsonDecoder.java:131)
at org.apache.avro.io.JsonDecoder.<init>(JsonDecoder.java:73)
at org.apache.avro.io.JsonDecoder.<init>(JsonDecoder.java:81)
at org.apache.avro.io.DecoderFactory.jsonDecoder(DecoderFactory.java:268)
at com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder.decode(LatestSchemaKafkaAvroMessageDecoder.java:26)
... 14 more

Solution: 
This is caused by using different kind of encoder and decoder on producer and consumer.
In camus.properties,
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder


Below is a working Camus.properties example for Avro and JSON.

# Needed Camus properties, more cleanup to come

# final top-level data output directory, sub-directory will be dynamically created for each topic pulled
etl.destination.path= camus_kafka_etl
# HDFS location where you want to keep execution files, i.e. offsets, error logs, and count files
etl.execution.base.path=camus_kafka_etl/base
# where completed Camus job output directories are kept, usually a sub-dir in the base.path
etl.execution.history.path=camus_kafka_etl/base/history
etl.execution.counts.path=camus_kafka_etl/base/counts
#new added
zookeeper.broker.topics=/brokers/topics

# Concrete implementation of the Encoder class to use (used by Kafka Audit, and thus optional for now)
#camus.message.encoder.class=com.linkedin.camus.etl.kafka.coders.DummyKafkaMessageEncoder

# Concrete implementation of the Decoder class to use
#camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder
#camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder

# This is new added
#etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.AvroRecordWriterProvider


# url for avro schema repo
etl.schema.registry.url=http://localhost:2876/schema-repo
kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry

# Used by avro-based Decoders to use as their Schema Registry
#kafka.message.coder.schema.registry.class=com.linkedin.camus.example.DummySchemaRegistry
#kafka.message.coder.schema.registry.class=com.linkedin.camus.example.schemaregistry.DummySchemaRegistry

# Used by the committer to arrange .avro files into a partitioned scheme. This will be the default partitioner for all
# topic that do not have a partitioner specified
#etl.partitioner.class=com.linkedin.camus.etl.kafka.coders.DefaultPartitioner

# Partitioners can also be set on a per-topic basis
#etl.partitioner.class.<topic-name>=com.your.custom.CustomPartitioner

# all files in this dir will be added to the distributed cache and placed on the classpath for hadoop tasks
#hdfs.default.classpath.dir=hdfs://localhost.localdomain:8020

# This is new added
fs.default.name=hdfs://localhost:8020/

# max hadoop tasks to use, each task can pull multiple topic partitions
mapred.map.tasks=30
# max historical time that will be pulled from each partition based on event timestamp
kafka.max.pull.hrs=1
# events with a timestamp older than this will be discarded.
kafka.max.historical.days=3
# Max minutes for each mapper to pull messages (-1 means no limit)
kafka.max.pull.minutes.per.task=-1

# if whitelist has values, only whitelisted topic are pulled.  nothing on the blacklist is pulled
kafka.blacklist.topics=test,avrotest,avrotopic, avrotest1, avrotest2, jsontest, DUMMY_LOG,DUMMY_LOG1,DUMMY_LOG2,DUMMY_LOG3, DUMMY_LOG4, DUMMY_LOG5,DUMMY_LOG_2
kafka.whitelist.topics=DUMMY_LOG_3
log4j.configuration=true

# Name of the client as seen by kafka
kafka.client.name=camus
# Fetch Request Parameters
#kafka.fetch.buffer.size=
#kafka.fetch.request.correlationid=
#kafka.fetch.request.max.wait=
#kafka.fetch.request.min.bytes=
# Connection parameters.
#kafka.brokers= ec2-54-197-143-117.compute-1.amazonaws.com:9092
kafka.brokers=localhost:9092
kafka.timeout.value= 60000


#Stops the mapper from getting inundated with Decoder exceptions for the same topic
#Default value is set to 10
max.decoder.exceptions.to.print=5

#Controls the submitting of counts to Kafka
#Default value set to true
post.tracking.counts.to.kafka=true
monitoring.event.class=class.that.generates.record.to.submit.counts.to.kafka

# everything below this point can be ignored for the time being, will provide more documentation down the road
##########################
etl.run.tracking.post=false
kafka.monitor.tier=
etl.counts.path=
kafka.monitor.time.granularity=10

etl.hourly=hourly
etl.daily=daily
etl.ignore.schema.errors=false

# configure output compression for deflate or snappy. Defaults to deflate
etl.output.codec=deflate
etl.deflate.level=6
#etl.output.codec=snappy

etl.default.timezone=America/Los_Angeles
etl.output.file.time.partition.mins=60
etl.keep.count.files=false
etl.execution.history.max.of.quota=.8

mapred.output.compress=true
mapred.map.max.attempts=1

kafka.client.buffer.size=20971520
kafka.client.so.timeout=60000

#zookeeper.session.timeout=
#zookeeper.connection.timeout=



Reference:
https://groups.google.com/forum/#!searchin/camus_etl/.JsonParseException$3A$20Unexpected$20character/camus_etl/O0HNjYKQiUo/vz9iQuxb_nUJ
https://medium.com/@thedude_rog/camus-gotchas-b8ecebc08645

Install and Configure Spark in CDH4.7

I tried to install Spark in CDH4.7 VM through parcel.
However, after activation the cluster can't be started. So I install Spark individually.


1. Configure conf/spark-defaults.conf

spark.eventLog.enabled           true
spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.driver.memory              2g
spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
spark.executor.memory          6g

2.  EOF Exception when trying to access hdfs
java.io.IOException: Call to localhost/10.85.85.17:9000 failed on local
> exception: java.io.EOFException

This means the hdfs url is not set properly.

val dataRDD = sc.textFile("hdfs://localhost.localdomain:8020/user/cloudera/data.txt")

3. org.apache.hadoop.ipc.remoteexception: server ipc version 7 cannot communicate with client version 4

This means your server has slighly newer version, than your client. 
Should choose to download spark pre-build for ch4, rather than pre-build for hadoop1.x

4. in CDH5.2 VM, Permission denied: user=cloudera, access=EXECUTE, inode="/user/spark":spark:spark:drwxr-x---
sudo -u hdfs hadoop fs -chmod -R 777 /user/spark

Reference:
http://stackoverflow.com/questions/23634985/error-when-trying-to-write-to-hdfs-server-ipc-version-9-cannot-communicate-with
https://gist.github.com/berngp/10793284

Saturday 20 December 2014

Install jblas linear algebra lib in MLlib



14/12/20 20:33:52 WARN TaskSetManager: Loss was due to java.lang.UnsatisfiedLinkError
java.lang.UnsatisfiedLinkError: org.jblas.NativeBlas.dposv(CII[DII[DII)I

MLLib uses native libraries, which need to be present on the nodes. (that is it does not come with spark installation)
MLlib uses the jblas linear algebra library, which itself depends on native Fortran routines. You may need to install the gfortran runtime library if it is not already present on your nodes. MLlib will throw a linking error if it cannot detect these libraries automatically.To use MLlib in Python, you will need NumPy version 1.7 or newer and Python 2.7.
You have to make sure that libgfortran library exists on all nodes.
for debian/ubuntu use: sudo apt-get install libgfortran3
for centos/redhat use: sudo yum install gcc-gfortran

Reference:
http://stackoverflow.com/questions/24758314/apache-spark-mllib-collaborative-filtering

Spark WARN: Initial job has not accepted any resources

$spark-shell --driver-memory 2g --executor-memory 6g

WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster ui
to ensure that workers are registered and have sufficient memory
This message will pop up any time an application is requesting more resources from the cluster than the cluster can currently provide. What resources you might ask? Well Spark is only looking for two things: Cores and Ram. Cores represents the number of open executor slots that your cluster provides for execution. Ram refers to the amount of free Ram required on any worker running your application. Note for both of these resources the maximum value is not your System’s max, it is the max as set by the your Spark configuration.
In CM, choose Spark Configuration.

1. Master Default Group
Java Heap Size of Master in Bytes 
master_max_heapsize = 2G
Maximum size for the Java process heap memory. Passed to Java -Xmx. Measured in bytes.

2. Worker Default Group
Java Heap Size of Worker in Bytes 
worker_max_heapsize = 2G
Maximum size for the Java process heap memory. Passed to Java -Xmx. Measured in bytes.

Total Java Heap Sizes of Worker's Executors in Bytes 
executor_total_max_heapsize=16G
Memory available to the Worker's Executors. This is the maximum sum total of all the Executors' Java heap sizes on this Worker node. Passed to Java -Xmx. Measured in bytes.

Reference:
Spark web UI: http://www.datastax.com/dev/blog/common-spark-troubleshooting

Broadcast Variables/Join in Spark

Spark’s shared variable, broadcast variables, allows the program to efficiently send a large, read-only value to all the worker nodes for use in one or more Spark operations. They come in handy, for example, if your application needs to send a large, read-only lookup table to all the nodes, or even a large feature vector in a machine learning algorithm. The value is sent to each node only once, using an efficient, BitTorrent-like communication mechanism.

When Spark runs a stage, it creates a binary representation of all the information needed to run tasks in that stage, called the closure of the function that needs to be executed. This closure includes all the data structures on the driver referenced in the function. Spark distributes it to every executor on the cluster.
Broadcast variables are useful in situations where many tasks need access to the same (immutable) data structure. They extend normal handling of task closures to enable:
  • Caching data as raw Java objects on each executor, so they need not be deserialized for each task
  • Caching data across multiple jobs and stages
Each Executor contains a BlockManager to mange its broadcast data. Keep public data in BlockManager can guarantee the tasks in an executor can share data.

Broadcast Values:

A local Map on the driver will be copied automatically with every task. Since many tasks execute in one JVM, it's wasteful to send and store so many copies of the data.
Instead, a broadcast variable makes Spark send and hold in memory just one copy for each machine in the cluster. When there are hundreds of executors and many execute in parallel on each machine, this can save significant network traffic and memory.


The process of using broadcast variables is simple:

  1. Create a Broadcast[T] by calling SparkContext.broadcast on an object of type T. Any type works as long as it is also Serializable.
  2. Access its value with the value property (or value() method in Java).
  3. The variable will be sent to each node only once, and should be treated as read-only (updates will not be propagated to other nodes).
Below example broad a list as lookup dict. Then, look up xdf in the filter function.


val bc = sc.broadcast(Array[String]("login3", "login4"))
val xdf = sqlContext.createDataFrame(
 Array(("login1", 192), ("login2", 146), ("login3", 72))
 ).toDF("name", "cnt")

val func: (String => Boolean) = 
 (arg: String) => bc.value.contains(arg)
val sqlfunc = udf(func)

val filtered = xdf.filter(sqlfunc(col("name")))

Broadcast Hash Join: 


/* Marks a DataFrame as small enough for use in broadcast joins.
*
* The following example marks the right DataFrame for broadcast hash join using `joinKey`.
* {{{
* // left and right are DataFrames
* left.join(broadcast(right), "joinKey")
* }}}
*/
def broadcast(df: DataFrame): DataFrame = {
DataFrame(df.sqlContext, BroadcastHint(df.logicalPlan))
}




Reference:

Friday 19 December 2014

Hadoop Streaming by Shell in Oozie

Below is an example to implement a hadoop streaming by shell in Oozie

command line:

hadoop jar $STREAMINGJAR \
-Dmapreduce.task.timeout=14400000 \
-input input_path \
-output output_path \
-numReduceTasks 1 \
-mapper 'shell_mapper.sh' \
-file 'shell_path/shell_mapper.sh'

shell_mapper.sh:

while read split; do
fout=`echo "$split" | awk '{split($0,a,"/"); split(a[5],b,"-"); split(b[3],c,"."); print "hdfs://output_path/20"c[1]"-"b[1]"-"b[2]"-"a[4]".txt"}'`
hdfs dfs -cat "$split"| tar zxfO - | bzip2 | hdfs dfs -put - "$fout"
done

Note: '-' refer to stdin rather than a file name. $fout refer to a file name rather than distinction folder.

Oozie Action:

   <action name="shell_mr">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${combined_file_path}"/>
            </prepare>
            <streaming>
                <mapper>${shell_mapper} ${wf:user()} ${output_path}</mapper>
            </streaming>
            <configuration>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>${input_path}</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>${output_path}</value>
                </property>

                <property>
                    <name>mapreduce.job.reduces</name>
                    <value>1</value>
                </property>
            </configuration>
            <file>${shell_path}</file>

        </map-reduce>
        <ok to="end"/>
        <error to="send_email"/>
    </action>

Note: the $output_path must be deleted, otherwise, the $output_path will be the target file name instead of target folder.

Thursday 18 December 2014

Alternating Least Squares in MLlib

Alternating Least Squares(ALS)
the algorithm repeatedly refines the contents of one matrix by solving a least-squares minimization problem that holds the other one fixed. Then it alternates, to refine the other matrix, and repeats iteratively. This is the source of its name. To begin the process, one matrix is filled with random feature vectors.

Importantly, ALS computations are quite parallelizable, because they can be decomposed into straightforward linear algebra operations on each row independently. They can be efficiently optimized for sparse data. 

These algorithms are sometimes called matrix completion algorithms, becausethe original matrix A may be quite sparse, but the approximate product is completely dense.

ALS is used to solve CF problem.

Collaborative filtering (CF) is a technique used by some recommender systems.

For example, deciding that two users may share similar tastes because they are the same age is not an example of collaborative filtering. Deciding that two users may like the same song since they play many of the same other songs is an example.

MLlib's ALS implementation requires numeric IDs for users and items, and further requires them to be nonnegative 32-bit integers, which means IDs larger than Integer.MAX_VALUE, 2147483647 can't be used.

By default, the RDD will contain one partition for each HDFS block. Because ML tasks is more compute-intensive, it's better to have more partitions. This can let Spark put more processor cores to work on the problem at once.






Tuesday 16 December 2014

Distcp Action in Oozie

1. Disctcp Action

<action name="distcp-node">
  <distcp xmlns="uri:oozie:distcp-action:0.1">
     <job-tracker>${jobTracker}</job-tracker>
       <name-node>${nameNode}</name-node>
       <prepare>
         <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/${outputDir}"/>
       </prepare>
       <configuration>
          <property>
            <name>mapred.job.queue.name</name>
            <value>${queueName}</value>
          </property>
       </configuration>
  <arg>${nameNode}/user/${wf:user()}/${examplesRoot}/input-data/text/data.txt</arg>
  <arg>${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/${outputDir}/data.txt</arg>
  </distcp>
  <ok to="end"/>
  <error to="fail"/>
</action>

2. Shell Action

<action name="distcp_shell">
        <shell xmlns="uri:oozie:shell-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path='${file_path}'/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
           
            </configuration>
            <exec>${distcp_script}</exec>
            <file>${distcp_script_path}#${distcp_script}</file>
            <capture-output/>
        </shell>
        <ok to="end"/>
        <error to="send_email"/>
    </action>


Shell Script:

#!/bin/sh
ACCESSKEYID=xxxx
SECRETACCESSKEY=xxxx

export HADOOP_USER_NAME=current_user

hadoop distcp -D fs.s3n.awsAccessKeyId=$ACCESSKEYID \
-D fs.s3n.awsSecretAccessKey=$SECRETACCESSKEY \
-f file_list_path/part-00000 \
hdfs://cdh/current_user/file_data_path/


Please Note:
if distcp only one file1.txt to a target folder1/file1.txt, make sure folder1 exist. Otherwise, the target file name will become folder1(a file instead of a folder).

hadoop fs -mkdir folder1
hadoop distcp folder0/file1.txt   folder1


3. Command lines from hdfs to another hdfs

(1) Copy folder "files" and all its children folders and files into "target" folder.
hadoop distcp  /user/me/source/files/  hdfs://namenode/user/me/target/

This will expand the namespace under /foo/bar on nn1 into a temporary file, partition its contents among a set of map tasks, and start a copy on each TaskTracker from nn1 to nn2. Note that DistCp expects absolute paths.
By default, files already existing at the destination are skipped (i.e. not replaced by the source file). A count of skipped files is reported at the end of each job

-overwrite: Overwrite destination

-update: Overwrite if src size different from dst size

(2) Put file paths in file_list.txt, the copy only files into "target" folder.
hadoop distcp -f /user/me/file_list.txt hdfs://namenode/user/me/target/


Reference:
http://oozie.apache.org/docs/4.0.0/DG_DistCpActionExtension.html
http://hadoop.apache.org/docs/r0.18.3/distcp.html

Remove tab trail after Key from map output

The default separator between key and value in map output is '\t'

For some hadoop streaming jobs, we output only key. However, there is always '\t' as tail, which is not obverse.
If Distcp is used to read file names in the output file, it can't automatically remove the tailing '\t'.

1. Implement a customized outputformat, and replace '\t' by blank.

CustomOutputFormat<K, V> extends TextOutputFormat<K, V>
String keyValueSeparator = job.get("mapred.textoutputformat.separator", "");
Add below parameters in the command line.
-libjars lib/CustomFormats.jar
-outputformat com.mapreduce.output.CustomOutputFormat

For oozie mapreduce action, add followings.

<property>
       <name>mapred.output.format.class</name>
       <value>${custom_format_class}</value>
</property>
<file>${lib_custom_format_path}</file>


if use 'mapreduce.job.outputformat.class'
Error: mapreduce.job.map.class is incompatible with map compatability mode.


2. Replace '\t' by another separator in action.
However, it can't be set as empty. If empty is set, it will use the default '\t' instead.

<property>
       <name>mapreduce.output.textoutputformat.separator</name>
       <value>;</value>
</property>

Reference:
http://stackoverflow.com/questions/18133290/hadoop-streaming-remove-trailing-tab-from-reducer-output
https://github.com/zouhc/MyHadoop/blob/master/doc/hue.md