在CDH平臺上,為了實現NRT(near real-time)近實時搜尋,flume收集的資料入solr,solr提供對外查詢。在flume收集到資料後(例如測試機器名稱dn12.hadoop),需要使用Morphline實現資料的ETL,才能轉換成solr的資料格式,所以配置分為三步。



solrctl instancedir --generate /home/data/collectionSignalling
solrctl instancedir --create collectionSignalling /home/data/collectionSignalling
solrctl collection --create collectionSignalling  -s 6 -m 15 -r 2 -c collectionSignalling -a


<field name="_version_" type="long" indexed="true" stored="true"/>
<field name="_root_" type="string" indexed="true" stored="false"/>   
<field name="timestamp" type="tdate" indexed="true" stored="true" default="NOW 8HOUR" multiValued="false"/>
<field name="text" type="text_general" indexed="true" stored="false" multiValued="true"/>    
<field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" />    
<!-- points to the root document of a block of nested documents. Required for nested
document support, may be removed otherwise
<field name="province_code" type="string" indexed="true" stored="true" multiValued="false"/>
<field name="caller" type="string" indexed="true" stored="true" multiValued="false"/>
<field name="called" type="string" indexed="true" stored="true" multiValued="false"/>
<field name="call_status" type="string" indexed="true" stored="true" multiValued="false"/>
<field name="call_time" type="tdate" indexed="true" stored="true" multiValued="false"/>
<field name="length_time" type="long" indexed="true" stored="true" multiValued="false"/>
<dynamicField name="ignored_*" type="ignored" multiValued="true"/>
<!-- Field to use to determine and enforce document uniqueness. 
Unless this field is marked with required="false", it will be a required field


solrctl instancedir --update collectionSignalling /home/data/collectionSignalling
solrctl collection --reload collectionSignalling


1.在Flume配置介面配置Flume依賴Solr,即Solr 服務選項選擇Solr

2.CM在flume agent中配置檔案,其中morphlineFile直接使用檔名稱,不用新增路徑

tier1.sources.source1.type = avro  
tier1.sources.source1.bind =  
tier1.sources.source1.port = 44444  
tier1.sinks.sink1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink = channel1  
tier1.sinks.sink1.morphlineFile = morphlines.conf  
tier1.sinks.sink1.morphlineId = collectionSignalling  


CDH在flume agent的Morphlines檔案選項上新增ETL配置

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
# Application configuration file in HOCON format (Human-Optimized Config Object Notation). 
# HOCON syntax is defined at
# and also used by Akka ( and Play (
# For more examples see
# morphline.conf example file
# this is a comment
# Specify server locations in a SOLR_LOCATOR variable; used later in variable substitutions:
# Name of solr collection
collection : collectionSignalling
# ZooKeeper ensemble
zkHost : "nn1.hadoop:2181,nn2.hadoop:2181,dn7.hadoop:2181,dn5.hadoop:2181,dn3.hadoop:2181/solr"
# Relative or absolute path to a directory containing conf/solrconfig.xml and conf/schema.xml
# If this path is uncommented it takes precedence over the configuration stored in ZooKeeper.  
# solrHomeDir : "example/solr/collection1"
# The maximum number of documents to send to Solr per network batch (throughput knob)
# batchSize : 100
# Specify an array of one or more morphlines, each of which defines an ETL 
# transformation chain. A morphline consists of one or more (potentially 
# nested) commands. A morphline is a way to consume records (e.g. Flume events, 
# HDFS files or blocks), turn them into a stream of records, and pipe the stream 
# of records through a set of easily configurable transformations on it's way to 
# Solr (or a MapReduceIndexerTool RecordWriter that feeds via a Reducer into Solr).
morphlines : [
# Name used to identify a morphline. E.g. used if there are multiple morphlines in a 
# morphline config file
id : collectionSignalling 
# Import all morphline commands in these java packages and their subpackages.
# Other commands that may be present on the classpath are not visible to this morphline.
importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
commands : [                    
extractJsonPaths {  
flatten : false  
paths : {   
province_code : /province_code              
caller : /caller  
called : /called  
call_status : /call_status  
call_time : /call_time 
length_time : /length_time     
# Consume the output record of the previous command and pipe another record downstream.
# convert timestamp field to native Solr timestamp format
# e.g. 2012-09-06T07:14:34Z to 2012-09-06T07:14:34.000Z
#  convertTimestamp {
#    field : call_time
#    inputFormats : ["yyyyMMdd HH:mm:ss"]
#    inputTimezone : Asia/Shanghai
#   outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSSZ"                                 
#    outputTimezone : Asia/Shanghai
#  }
{generateUUID {
field : id
# Consume the output record of the previous command and pipe another record downstream.
# Command that sanitizes record fields that are unknown to Solr schema.xml by either 
# deleting them (renameToPrefix is absent or a zero length string), or by moving them to a
# field prefixed with the given renameToPrefix (e.g. renameToPrefix = "ignored_" to use 
# typical dynamic Solr fields).
# Recall that Solr throws an exception on any attempt to load a document that contains a 
# field that isn't specified in schema.xml.
sanitizeUnknownSolrFields {
# Location from which to fetch Solr schema
solrLocator : ${SOLR_LOCATOR}
renameToPrefix : "ignored_"
# log the record at DEBUG level to SLF4J
{ logDebug { format : "output record: {}", args : ["@{}"] } }    
# load the record into a SolrServer or MapReduce SolrOutputFormat.
loadSolr {
solrLocator : ${SOLR_LOCATOR}



{"province_code":"150000","caller":"18353662886","called":"15335586466","call_status":"1","call_time":"20161221 08:51:40","length_time":"58526"}


[[email protected] bin]$ cd /opt/cloudera/parcels/CDH/bin
[[email protected] bin]$ flume-ng avro-client -H dn12.hadoop -p 44444 -F /home/hadoop/test/zhenzhen/file01



[[email protected] flume-ng]# pwd
[[email protected] flume-ng]# tail -f flume-cmf-flume-AGENT-dn12.hadoop.log

2.在CM上檢視,如果沒有詳細資訊,flume agent->日誌->Agent 記錄閾值,可以調低日誌級別為TRACE,在CM介面的日誌檔案選單也可以檢視日誌。