Я новичок в Spark и HDInsight. Мне интересно, как работает приведенный ниже код? Я имею в виду, как задания планируются между узлами кластера. предположим, у меня есть 2 головных узла и 4 рабочих узла, какие коды будут работать на головных узлах и какие коды будут работать на рабочих узлах?
Как я могу сообщить Spark, что хочу, чтобы какой-то код выполнялся на рабочих узлах?
// the code is from https://docs.microsoft.com/en-us/azure/hdinsight/spark/apache-spark-eventhub-streaming
import com.microsoft.azure.eventhubs._
val pool = Executors.newFixedThreadPool(1)
val eventHubClient = EventHubClient.create(connStr.toString(), pool)
def sendEvent(message: String) = {
val messageData = EventData.create(message.getBytes("UTF-8"))
eventHubClient.get().send(messageData)
println("Sent event: " + message + "\n")
}
import twitter4j._
import twitter4j.TwitterFactory
import twitter4j.Twitter
import twitter4j.conf.ConfigurationBuilder
// Twitter application configurations
// Replace values below with yours
val twitterConsumerKey = "<CONSUMER KEY>"
val twitterConsumerSecret = "<CONSUMER SECRET>"
val twitterOauthAccessToken = "<ACCESS TOKEN>"
val twitterOauthTokenSecret = "<TOKEN SECRET>"
val cb = new ConfigurationBuilder()
cb.setDebugEnabled(true).setOAuthConsumerKey(twitterConsumerKey).setOAuthConsumerSecret(twitterConsumerSecret).setOAuthAccessToken(twitterOauthAccessToken).setOAuthAccessTokenSecret(twitterOauthTokenSecret)
val twitterFactory = new TwitterFactory(cb.build())
val twitter = twitterFactory.getInstance()
// Getting tweets with keyword "Azure" and sending them to the Event Hub in realtime!
val query = new Query(" #Azure ")
query.setCount(100)
query.lang("en")
var finished = false
while (!finished) {
val result = twitter.search(query)
val statuses = result.getTweets()
var lowestStatusId = Long.MaxValue
for (status <- statuses.asScala) {
if(!status.isRetweet()){
sendEvent(status.getText())
}
lowestStatusId = Math.min(status.getId(), lowestStatusId)
Thread.sleep(2000)
}
query.setMaxId(lowestStatusId - 1)
}
// Closing connection to the Event Hub
eventHubClient.get().close()